You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/08/21 03:08:40 UTC

[GitHub] dongeforever closed pull request #398: [ISSUE #395]Resolve compatibility issues and keep consistent with the old API

dongeforever closed pull request #398: [ISSUE #395]Resolve compatibility issues and keep consistent with the old API
URL: https://github.com/apache/rocketmq/pull/398
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index e1d9f9042..d095254a6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -16,6 +16,22 @@
  */
 package org.apache.rocketmq.client.impl.producer;
 
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.common.ClientErrorCode;
@@ -31,11 +47,13 @@
 import org.apache.rocketmq.client.latency.MQFaultStrategy;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
 import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.producer.TransactionCheckListener;
 import org.apache.rocketmq.client.producer.TransactionListener;
 import org.apache.rocketmq.client.producer.TransactionMQProducer;
 import org.apache.rocketmq.client.producer.TransactionSendResult;
@@ -65,23 +83,6 @@
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import java.util.concurrent.RejectedExecutionException;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 public class DefaultMQProducerImpl implements MQProducerInner {
     private final InternalLogger log = ClientLogger.getLog();
@@ -120,10 +121,10 @@ public void initTransactionEnv() {
         if (producer.getExecutorService() != null) {
             this.checkExecutor = producer.getExecutorService();
         } else {
-            this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(2000);
+            this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
             this.checkExecutor = new ThreadPoolExecutor(
-                1,
-                1,
+                producer.getCheckThreadPoolMinSize(),
+                producer.getCheckThreadPoolMaxSize(),
                 1000 * 60,
                 TimeUnit.MILLISECONDS,
                 this.checkRequestQueue);
@@ -131,8 +132,9 @@ public void initTransactionEnv() {
     }
 
     public void destroyTransactionEnv() {
-        this.checkExecutor.shutdown();
-        this.checkRequestQueue.clear();
+        if (this.checkExecutor != null) {
+            this.checkExecutor.shutdown();
+        }
     }
 
     public void registerSendMessageHook(final SendMessageHook hook) {
@@ -243,13 +245,27 @@ public boolean isPublishTopicNeedUpdate(String topic) {
         return null == prev || !prev.ok();
     }
 
+    /**
+     * This method will be removed in the version 5.0.0 and <code>getCheckListener</code> is recommended.
+     * @return
+     */
+    @Override
+    @Deprecated
+    public TransactionCheckListener checkListener() {
+        if (this.defaultMQProducer instanceof TransactionMQProducer) {
+            TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
+            return producer.getTransactionCheckListener();
+        }
+
+        return null;
+    }
+
     @Override
-    public TransactionListener checkListener() {
+    public TransactionListener getCheckListener() {
         if (this.defaultMQProducer instanceof TransactionMQProducer) {
             TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
             return producer.getTransactionListener();
         }
-
         return null;
     }
 
@@ -264,12 +280,20 @@ public void checkTransactionState(final String addr, final MessageExt msg,
 
             @Override
             public void run() {
-                TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
-                if (transactionCheckListener != null) {
+                TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
+                TransactionListener transactionListener = getCheckListener();
+                if (transactionCheckListener != null || transactionListener != null) {
                     LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                     Throwable exception = null;
                     try {
-                        localTransactionState = transactionCheckListener.checkLocalTransaction(message);
+                        if (transactionCheckListener != null) {
+                            localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
+                        } else if (transactionListener != null) {
+                            log.debug("Used new check API in transaction message");
+                            localTransactionState = transactionListener.checkLocalTransaction(message);
+                        } else {
+                            log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
+                        }
                     } catch (Throwable e) {
                         log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
                         exception = e;
@@ -280,7 +304,7 @@ public void run() {
                         group,
                         exception);
                 } else {
-                    log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
+                    log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
                 }
             }
 
@@ -1096,9 +1120,10 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
     }
 
     public TransactionSendResult sendMessageInTransaction(final Message msg,
-                                                          final TransactionListener tranExecuter, final Object arg)
+                                                          final LocalTransactionExecuter localTransactionExecuter, final Object arg)
         throws MQClientException {
-        if (null == tranExecuter) {
+        TransactionListener transactionListener = getCheckListener();
+        if (null == localTransactionExecuter && null == transactionListener) {
             throw new MQClientException("tranExecutor is null", null);
         }
         Validators.checkMessage(msg, this.defaultMQProducer);
@@ -1124,7 +1149,12 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
                     if (null != transactionId && !"".equals(transactionId)) {
                         msg.setTransactionId(transactionId);
                     }
-                    localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
+                    if (null != localTransactionExecuter) {
+                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
+                    } else if (transactionListener != null) {
+                        log.debug("Used new transaction API");
+                        transactionListener.executeLocalTransaction(msg, arg);
+                    }
                     if (null == localTransactionState) {
                         localTransactionState = LocalTransactionState.UNKNOW;
                     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
index 52ebe1b57..acfd7b1f2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
@@ -16,18 +16,19 @@
  */
 package org.apache.rocketmq.client.impl.producer;
 
+import java.util.Set;
+import org.apache.rocketmq.client.producer.TransactionCheckListener;
 import org.apache.rocketmq.client.producer.TransactionListener;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
 
-import java.util.Set;
-
 public interface MQProducerInner {
     Set<String> getPublishTopicList();
 
     boolean isPublishTopicNeedUpdate(final String topic);
 
-    TransactionListener checkListener();
+    TransactionCheckListener checkListener();
+    TransactionListener getCheckListener();
 
     void checkTransactionState(
         final String addr,
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 065f068c7..9732d0eb8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -464,16 +464,31 @@ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
      * This method is to send transactional messages.
      *
      * @param msg Transactional message to send.
+     * @param tranExecuter local transaction executor.
      * @param arg Argument used along with local transaction executor.
      * @return Transaction result.
      * @throws MQClientException if there is any client error.
      */
     @Override
-    public TransactionSendResult sendMessageInTransaction(Message msg, final Object arg)
+    public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter,
+        final Object arg)
         throws MQClientException {
         throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
     }
 
+    /**
+     * This method is used to send transactional messages.
+     * @param msg Transactional message to send.
+     * @param arg Argument used along with local transaction executor.
+     * @return Transaction result.
+     * @throws MQClientException
+     */
+    @Override
+    public TransactionSendResult sendMessageInTransaction(Message msg,
+        Object arg) throws MQClientException {
+        throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
+    }
+
     /**
      * Create a topic on broker.
      *
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
new file mode 100644
index 000000000..28789b91d
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.common.message.Message;
+
+/**
+ * This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
+ */
+@Deprecated
+public interface LocalTransactionExecuter {
+    LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index 0776ee155..1af600574 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -80,7 +80,11 @@ void send(final Message msg, final MessageQueueSelector selector, final Object a
     void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
         throws MQClientException, RemotingException, InterruptedException;
 
-    TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException;
+    TransactionSendResult sendMessageInTransaction(final Message msg,
+        final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
+
+    TransactionSendResult sendMessageInTransaction(final Message msg,
+        final Object arg) throws MQClientException;
 
     //for batch
     SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java
new file mode 100644
index 000000000..2d7cf5819
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+/**
+ * This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
+ */
+@Deprecated
+public interface TransactionCheckListener {
+    LocalTransactionState checkLocalTransactionState(final MessageExt msg);
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java
index c750e5384..233af69bc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java
@@ -37,4 +37,4 @@
      * @return Transaction state
      */
     LocalTransactionState checkLocalTransaction(final MessageExt msg);
-}
+}
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
index c4f122c58..8f6428b29 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
@@ -16,17 +16,21 @@
  */
 package org.apache.rocketmq.client.producer;
 
+import java.util.concurrent.ExecutorService;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.RPCHook;
 
-import java.util.concurrent.ExecutorService;
-
 public class TransactionMQProducer extends DefaultMQProducer {
-    private TransactionListener transactionListener;
+    private TransactionCheckListener transactionCheckListener;
+    private int checkThreadPoolMinSize = 1;
+    private int checkThreadPoolMaxSize = 1;
+    private int checkRequestHoldMax = 2000;
 
     private ExecutorService executorService;
 
+    private TransactionListener transactionListener;
+
     public TransactionMQProducer() {
     }
 
@@ -50,21 +54,77 @@ public void shutdown() {
         this.defaultMQProducerImpl.destroyTransactionEnv();
     }
 
+    /**
+     * This method will be removed in the version 5.0.0, method <code>sendMessageInTransaction(Message,Object)</code>}
+     * is recommended.
+     */
+    @Override
+    @Deprecated
+    public TransactionSendResult sendMessageInTransaction(final Message msg,
+        final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
+        if (null == this.transactionCheckListener) {
+            throw new MQClientException("localTransactionBranchCheckListener is null", null);
+        }
+
+        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
+    }
+
     @Override
-    public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
+    public TransactionSendResult sendMessageInTransaction(final Message msg,
+        final Object arg) throws MQClientException {
         if (null == this.transactionListener) {
             throw new MQClientException("TransactionListener is null", null);
         }
 
-        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg);
+        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
     }
 
-    public TransactionListener getTransactionListener() {
-        return transactionListener;
+    public TransactionCheckListener getTransactionCheckListener() {
+        return transactionCheckListener;
     }
 
-    public void setTransactionListener(TransactionListener transactionListener) {
-        this.transactionListener = transactionListener;
+    /**
+     * This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
+     */
+    @Deprecated
+    public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
+        this.transactionCheckListener = transactionCheckListener;
+    }
+
+    public int getCheckThreadPoolMinSize() {
+        return checkThreadPoolMinSize;
+    }
+
+    /**
+     * This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
+     */
+    @Deprecated
+    public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
+        this.checkThreadPoolMinSize = checkThreadPoolMinSize;
+    }
+
+    public int getCheckThreadPoolMaxSize() {
+        return checkThreadPoolMaxSize;
+    }
+
+    /**
+     * This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
+     */
+    @Deprecated
+    public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
+        this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;
+    }
+
+    public int getCheckRequestHoldMax() {
+        return checkRequestHoldMax;
+    }
+
+    /**
+     * This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
+     */
+    @Deprecated
+    public void setCheckRequestHoldMax(int checkRequestHoldMax) {
+        this.checkRequestHoldMax = checkRequestHoldMax;
     }
 
     public ExecutorService getExecutorService() {
@@ -74,4 +134,12 @@ public ExecutorService getExecutorService() {
     public void setExecutorService(ExecutorService executorService) {
         this.executorService = executorService;
     }
+
+    public TransactionListener getTransactionListener() {
+        return transactionListener;
+    }
+
+    public void setTransactionListener(TransactionListener transactionListener) {
+        this.transactionListener = transactionListener;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 442f456aa..9b713d12a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -150,13 +150,13 @@
      * that can be checked.
      */
     @ImportantField
-    private long transactionTimeOut = 3 * 1000;
+    private long transactionTimeOut = 6 * 1000;
 
     /**
      * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
      */
     @ImportantField
-    private int transactionCheckMax = 5;
+    private int transactionCheckMax = 15;
 
     /**
      * Transaction message check interval.
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index ae97cc97d..20d186764 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -57,7 +57,7 @@
     public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
     //http://jmenv.tbsite.net:8080/rocketmq/nsaddr
     //public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
-    public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "AUTO_CREATE_TOPIC_KEY"; // Will be created at broker when isAutoCreateTopicEnable
+    public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable
     public static final String BENCHMARK_TOPIC = "BenchmarkTest";
     public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
     public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 2d8a5fead..d9fafdd08 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -17,15 +17,6 @@
 
 package org.apache.rocketmq.example.benchmark;
 
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.LocalTransactionState;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.client.producer.TransactionListener;
-import org.apache.rocketmq.client.producer.TransactionMQProducer;
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-
 import java.io.UnsupportedEncodingException;
 import java.util.LinkedList;
 import java.util.Timer;
@@ -33,18 +24,27 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.TransactionCheckListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
 
 public class TransactionProducer {
     private static int threadCount;
     private static int messageSize;
-    private static boolean isCheck;
-    private static boolean isCheckFalse;
+    private static boolean ischeck;
+    private static boolean ischeckffalse;
 
     public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
         threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
         messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
-        isCheck = args.length >= 3 && Boolean.parseBoolean(args[2]);
-        isCheckFalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
+        ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]);
+        ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
 
         final Message msg = buildMessage(messageSize);
 
@@ -73,8 +73,8 @@ private void printStats() {
                     Long[] end = snapshotList.getLast();
 
                     final long sendTps =
-                        (long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L);
-                    final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]);
+                        (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
+                    final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
 
                     System.out.printf(
                         "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n",
@@ -92,14 +92,16 @@ public void run() {
             }
         }, 10000, 10000);
 
-        final TransactionListener transactionListener =
-            new TransactionListenerImpl(isCheckFalse, isCheck, statsBenchmark);
+        final TransactionCheckListener transactionCheckListener =
+            new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark);
         final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer");
         producer.setInstanceName(Long.toString(System.currentTimeMillis()));
-        producer.setTransactionListener(transactionListener);
+        producer.setTransactionCheckListener(transactionCheckListener);
         producer.setDefaultTopicQueueNums(1000);
         producer.start();
 
+        final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
+
         for (int i = 0; i < threadCount; i++) {
             sendThreadPool.execute(new Runnable() {
                 @Override
@@ -109,7 +111,7 @@ public void run() {
                             // Thread.sleep(1000);
                             final long beginTimestamp = System.currentTimeMillis();
                             SendResult sendResult =
-                                producer.sendMessageInTransaction(msg, null);
+                                producer.sendMessageInTransaction(msg, tranExecuter, null);
                             if (sendResult != null) {
                                 statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
                                 statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
@@ -122,7 +124,8 @@ public void run() {
                                 boolean updated =
                                     statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT,
                                         currentRT);
-                                if (updated) { break; }
+                                if (updated)
+                                    break;
 
                                 prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
                             }
@@ -150,37 +153,43 @@ private static Message buildMessage(final int messageSize) throws UnsupportedEnc
     }
 }
 
+class TransactionExecuterBImpl implements LocalTransactionExecuter {
 
-class TransactionListenerImpl implements TransactionListener {
-    private boolean isCheckFalse;
+    private boolean ischeck;
+
+    public TransactionExecuterBImpl(boolean ischeck) {
+        this.ischeck = ischeck;
+    }
+
+    @Override
+    public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
+        if (ischeck) {
+            return LocalTransactionState.UNKNOW;
+        }
+        return LocalTransactionState.COMMIT_MESSAGE;
+    }
+}
+
+class TransactionCheckListenerBImpl implements TransactionCheckListener {
+    private boolean ischeckffalse;
     private StatsBenchmarkTProducer statsBenchmarkTProducer;
-    private boolean isCheckLocal;
 
-    public TransactionListenerImpl(boolean isCheckFalse, boolean isCheckLocal,
-                                   StatsBenchmarkTProducer statsBenchmarkTProducer) {
-        this.isCheckFalse = isCheckFalse;
-        this.isCheckLocal = isCheckLocal;
+    public TransactionCheckListenerBImpl(boolean ischeckffalse,
+        StatsBenchmarkTProducer statsBenchmarkTProducer) {
+        this.ischeckffalse = ischeckffalse;
         this.statsBenchmarkTProducer = statsBenchmarkTProducer;
     }
 
     @Override
-    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+    public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
         statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet();
-        if (isCheckFalse) {
+        if (ischeckffalse) {
 
             return LocalTransactionState.ROLLBACK_MESSAGE;
         }
 
         return LocalTransactionState.COMMIT_MESSAGE;
     }
-
-    @Override
-    public LocalTransactionState executeLocalTransaction(final Message msg, final Object arg) {
-        if (isCheckLocal) {
-            return LocalTransactionState.UNKNOW;
-        }
-        return LocalTransactionState.COMMIT_MESSAGE;
-    }
 }
 
 class StatsBenchmarkTProducer {
diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java
index ce471d2ce..cb066d21d 100644
--- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java
+++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java
@@ -48,6 +48,8 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                     return LocalTransactionState.COMMIT_MESSAGE;
                 case 2:
                     return LocalTransactionState.ROLLBACK_MESSAGE;
+                default:
+                    return LocalTransactionState.COMMIT_MESSAGE;
             }
         }
         return LocalTransactionState.COMMIT_MESSAGE;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services