You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/27 09:26:51 UTC

[1/2] incubator-rocketmq git commit: Allow setting base factor for commercial data.

Repository: incubator-rocketmq
Updated Branches:
  refs/heads/master e5892e164 -> 1356e35f4


Allow setting base factor for commercial data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/0c022e05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/0c022e05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/0c022e05

Branch: refs/heads/master
Commit: 0c022e05af86fa53c71bcb92b93a8a2b6fb82908
Parents: e5892e1
Author: yukon <yu...@apache.org>
Authored: Tue Dec 27 16:38:25 2016 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Dec 27 16:38:25 2016 +0800

----------------------------------------------------------------------
 .../rocketmq/broker/processor/PullMessageProcessor.java     | 4 +++-
 .../rocketmq/broker/processor/SendMessageProcessor.java     | 3 ++-
 .../main/java/com/alibaba/rocketmq/common/BrokerConfig.java | 9 +++++++++
 3 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
index 0152b93..1257f18 100644
--- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
@@ -299,9 +299,11 @@ public class PullMessageProcessor implements NettyRequestProcessor {
 
                 switch (response.getCode()) {
                     case ResponseCode.SUCCESS:
+                        int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
+                        int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
 
                         context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
-                        context.setCommercialRcvTimes(getMessageResult.getMsgCount4Commercial());
+                        context.setCommercialRcvTimes(incValue);
                         context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
                         context.setCommercialOwner(owner);
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
index 414b3f4..a375285 100644
--- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
@@ -428,8 +428,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
                     sendMessageContext.setQueueId(responseHeader.getQueueId());
                     sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
 
+                    int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
                     int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
-                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
 
                     sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
                     sendMessageContext.setCommercialSendTimes(incValue);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
index 6eae0a7..ba80a3f 100644
--- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
@@ -85,6 +85,7 @@ public class BrokerConfig {
     private int commercialTimerCount = 1;
     private int commercialTransCount = 1;
     private int commercialBigCount = 1;
+    private int commercialBaseCount = 1;
 
     private boolean transferMsgByHeap = true;
     private int maxDelayTime = 40;
@@ -537,4 +538,12 @@ public class BrokerConfig {
     public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) {
         this.consumerManageThreadPoolNums = consumerManageThreadPoolNums;
     }
+
+    public int getCommercialBaseCount() {
+        return commercialBaseCount;
+    }
+
+    public void setCommercialBaseCount(int commercialBaseCount) {
+        this.commercialBaseCount = commercialBaseCount;
+    }
 }


[2/2] incubator-rocketmq git commit: MASTER [ROCKETMQ-14] invoke callback shoule be invoked in an executor rather than in current thread, closes apache/incubator-rocketmq#2

Posted by yu...@apache.org.
MASTER [ROCKETMQ-14] invoke callback shoule be invoked in an executor rather than in current thread, closes apache/incubator-rocketmq#2


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/1356e35f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/1356e35f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/1356e35f

Branch: refs/heads/master
Commit: 1356e35f45ebfbca27930fe06e7abd659f111fb4
Parents: 0c022e0
Author: Jaskey <li...@gmail.com>
Authored: Tue Dec 27 17:26:05 2016 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Dec 27 17:26:05 2016 +0800

----------------------------------------------------------------------
 .../remoting/netty/NettyRemotingAbstract.java   | 67 +++++++++++---------
 .../rocketmq/remoting/NettyConnectionTest.java  | 52 +++++++++++++++
 2 files changed, 88 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1356e35f/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 70ae5b5..1c3fdc5 100644
--- a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -198,35 +198,7 @@ public abstract class NettyRemotingAbstract {
             responseTable.remove(opaque);
 
             if (responseFuture.getInvokeCallback() != null) {
-                boolean runInThisThread = false;
-                ExecutorService executor = this.getCallbackExecutor();
-                if (executor != null) {
-                    try {
-                        executor.submit(new Runnable() {
-                            @Override
-                            public void run() {
-                                try {
-                                    responseFuture.executeInvokeCallback();
-                                } catch (Throwable e) {
-                                    PLOG.warn("execute callback in executor exception, and callback throw", e);
-                                }
-                            }
-                        });
-                    } catch (Exception e) {
-                        runInThisThread = true;
-                        PLOG.warn("execute callback in executor exception, maybe executor busy", e);
-                    }
-                } else {
-                    runInThisThread = true;
-                }
-
-                if (runInThisThread) {
-                    try {
-                        responseFuture.executeInvokeCallback();
-                    } catch (Throwable e) {
-                        PLOG.warn("executeInvokeCallback Exception", e);
-                    }
-                }
+                executeInvokeCallback(responseFuture);
             } else {
                 responseFuture.putResponse(cmd);
             }
@@ -236,6 +208,39 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
+    //execute callback in callback executor. If callback executor is null, run directly in current thread
+    private void executeInvokeCallback(final ResponseFuture responseFuture) {
+        boolean runInThisThread = false;
+        ExecutorService executor = this.getCallbackExecutor();
+        if (executor != null) {
+            try {
+                executor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            responseFuture.executeInvokeCallback();
+                        } catch (Throwable e) {
+                            PLOG.warn("execute callback in executor exception, and callback throw", e);
+                        }
+                    }
+                });
+            } catch (Exception e) {
+                runInThisThread = true;
+                PLOG.warn("execute callback in executor exception, maybe executor busy", e);
+            }
+        } else {
+            runInThisThread = true;
+        }
+
+        if (runInThisThread) {
+            try {
+                responseFuture.executeInvokeCallback();
+            } catch (Throwable e) {
+                PLOG.warn("executeInvokeCallback Exception", e);
+            }
+        }
+    }
+
     public abstract RPCHook getRPCHook();
 
     abstract public ExecutorService getCallbackExecutor();
@@ -257,7 +262,7 @@ public abstract class NettyRemotingAbstract {
 
         for (ResponseFuture rf : rfList) {
             try {
-                rf.executeInvokeCallback();
+                executeInvokeCallback(rf);
             } catch (Throwable e) {
                 PLOG.warn("scanResponseTable, operationComplete Exception", e);
             }
@@ -329,7 +334,7 @@ public abstract class NettyRemotingAbstract {
                         responseFuture.putResponse(null);
                         responseTable.remove(opaque);
                         try {
-                            responseFuture.executeInvokeCallback();
+                            executeInvokeCallback(responseFuture);
                         } catch (Throwable e) {
                             PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);
                         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1356e35f/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
index e4ff948..755d332 100644
--- a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
@@ -22,9 +22,15 @@ import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
 import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
 import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
 import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
+import com.alibaba.rocketmq.remoting.netty.ResponseFuture;
 import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 
 /**
 
@@ -51,6 +57,52 @@ public class NettyConnectionTest {
         System.out.println("-----------------------------------------------------------------");
     }
 
+
+    @Test
+    public void test_async_timeout() throws InterruptedException, RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException {
+        RemotingClient client = createRemotingClient();
+        final AtomicInteger ai = new AtomicInteger(0);
+        final CountDownLatch latch = new CountDownLatch(100);
+        for(int i=0;i<100;i++) {
+            try {
+                RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+                client.invokeAsync("localhost:8888", request, 5, new InvokeCallback() {//very easy to timeout
+                    @Override
+                    public void operationComplete(ResponseFuture responseFuture) {
+                        if (responseFuture.isTimeout()) {
+                            if(ai.getAndIncrement()==4) {
+                                try {
+                                    System.out.println("First try timeout,  blocking 10s" + Thread.currentThread().getName());
+                                    Thread.sleep(10 * 1000);
+                                } catch (InterruptedException e) {
+                                    e.printStackTrace();
+                                }
+                            }
+                            else{
+                                System.out.println("Timeout callback execute,very short."+Thread.currentThread().getName());
+                            }
+                        }
+                        else{
+                            System.out.println("Success."+Thread.currentThread().getName());
+                        }
+                        latch.countDown();
+
+                    }
+                });
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+
+
+        latch.await(1000, TimeUnit.MILLISECONDS);
+        Assert.assertEquals(1, latch.getCount());//only one should be blocked
+        client.shutdown();
+        System.out.println("-----------------------------------------------------------------");
+    }
+
     public static RemotingClient createRemotingClient() {
         NettyClientConfig config = new NettyClientConfig();
         config.setClientChannelMaxIdleTimeSeconds(15);