You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2020/05/13 02:46:30 UTC

[rocketmq-site] branch master updated: fix java code block

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-site.git


The following commit(s) were added to refs/heads/master by this push:
     new c488486  fix java code block
     new 9f80be3  Merge pull request #67 from web1992/master
c488486 is described below

commit c488486e4d85ad3928bb5c0c580b111a4535c4a9
Author: web1992 <55...@qq.com>
AuthorDate: Fri Apr 24 23:17:42 2020 +0800

    fix java code block
---
 _docs/25-rmq-transaction-example.md | 152 ++++++++++++++++++------------------
 1 file changed, 77 insertions(+), 75 deletions(-)

diff --git a/_docs/25-rmq-transaction-example.md b/_docs/25-rmq-transaction-example.md
index 1d8592a..5a29688 100644
--- a/_docs/25-rmq-transaction-example.md
+++ b/_docs/25-rmq-transaction-example.md
@@ -37,88 +37,90 @@ Transactional message ensures that the execution of local transaction and the se
 
   (1)Create the transactional producer  
    Use TransactionMQProducer class to create producer client, and specify a unique producerGroup, and you can set up a custom thread pool to process check requests. After executing the local transaction, you need to reply to MQ according to the execution result,and the reply status is described in the above section.  
-       
-    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-    import org.apache.rocketmq.common.message.MessageExt;
-    import java.util.List;
-    
-    public class TransactionProducer {
-        public static void main(String[] args) throws MQClientException, InterruptedException {
-            TransactionListener transactionListener = new TransactionListenerImpl();
-            TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
-            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
-                @Override
-                public Thread newThread(Runnable r) {
-                    Thread thread = new Thread(r);
-                    thread.setName("client-transaction-msg-check-thread");
-                    return thread;
-                }
-            });
-    
-            producer.setExecutorService(executorService);
-            producer.setTransactionListener(transactionListener);
-            producer.start();
-    
-            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
-            for (int i = 0; i < 10; i++) {
-                try {
-                    Message msg =
-                        new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
-                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
-                    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
-                    System.out.printf("%s%n", sendResult);
-    
-                    Thread.sleep(10);
-                } catch (MQClientException | UnsupportedEncodingException e) {
-                    e.printStackTrace();
-                }
+
+```java
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import java.util.List;
+
+public class TransactionProducer {
+    public static void main(String[] args) throws MQClientException, InterruptedException {
+        TransactionListener transactionListener = new TransactionListenerImpl();
+        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
+        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread thread = new Thread(r);
+                thread.setName("client-transaction-msg-check-thread");
+                return thread;
             }
-    
-            for (int i = 0; i < 100000; i++) {
-                Thread.sleep(1000);
+        });
+
+        producer.setExecutorService(executorService);
+        producer.setTransactionListener(transactionListener);
+        producer.start();
+
+        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
+        for (int i = 0; i < 10; i++) {
+            try {
+                Message msg =
+                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
+                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
+                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
+                System.out.printf("%s%n", sendResult);
+
+                Thread.sleep(10);
+            } catch (MQClientException | UnsupportedEncodingException e) {
+                e.printStackTrace();
             }
-            producer.shutdown();
         }
+
+        for (int i = 0; i < 100000; i++) {
+            Thread.sleep(1000);
+        }
+        producer.shutdown();
     }
-    ```
+}
+ ```
 
     
   (2)Implement the TransactionListener interface  
    The "executeLocalTransaction" method is used to execute local transaction when send half message succeed. It returns one of three transaction status mentioned in the previous section.  
    The "checkLocalTransaction" method is used to check the local transaction status and respond to MQ check requests. It also returns one of three transaction status mentioned in the previous section.  
 
-       import ...
-       
-       public class TransactionListenerImpl implements TransactionListener {
-           private AtomicInteger transactionIndex = new AtomicInteger(0);
-       
-           private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
-       
-           @Override
-           public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
-               int value = transactionIndex.getAndIncrement();
-               int status = value % 3;
-               localTrans.put(msg.getTransactionId(), status);
-               return LocalTransactionState.UNKNOW;
-           }
-       
-           @Override
-           public LocalTransactionState checkLocalTransaction(MessageExt msg) {
-               Integer status = localTrans.get(msg.getTransactionId());
-               if (null != status) {
-                   switch (status) {
-                       case 0:
-                           return LocalTransactionState.UNKNOW;
-                       case 1:
-                           return LocalTransactionState.COMMIT_MESSAGE;
-                       case 2:
-                           return LocalTransactionState.ROLLBACK_MESSAGE;
-                   }
-               }
-               return LocalTransactionState.COMMIT_MESSAGE;
-           }
-       }
-       ```
\ No newline at end of file
+```java
+import ...
+
+public class TransactionListenerImpl implements TransactionListener {
+    private AtomicInteger transactionIndex = new AtomicInteger(0);
+
+    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
+
+    @Override
+    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
+        int value = transactionIndex.getAndIncrement();
+        int status = value % 3;
+        localTrans.put(msg.getTransactionId(), status);
+        return LocalTransactionState.UNKNOW;
+    }
+
+    @Override
+    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+        Integer status = localTrans.get(msg.getTransactionId());
+        if (null != status) {
+            switch (status) {
+                case 0:
+                    return LocalTransactionState.UNKNOW;
+                case 1:
+                    return LocalTransactionState.COMMIT_MESSAGE;
+                case 2:
+                    return LocalTransactionState.ROLLBACK_MESSAGE;
+            }
+        }
+        return LocalTransactionState.COMMIT_MESSAGE;
+    }
+}
+```