You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/27 09:26:51 UTC
[1/2] incubator-rocketmq git commit: Allow setting base factor for
commercial data.
Repository: incubator-rocketmq
Updated Branches:
refs/heads/master e5892e164 -> 1356e35f4
Allow setting base factor for commercial data.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/0c022e05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/0c022e05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/0c022e05
Branch: refs/heads/master
Commit: 0c022e05af86fa53c71bcb92b93a8a2b6fb82908
Parents: e5892e1
Author: yukon <yu...@apache.org>
Authored: Tue Dec 27 16:38:25 2016 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Dec 27 16:38:25 2016 +0800
----------------------------------------------------------------------
.../rocketmq/broker/processor/PullMessageProcessor.java | 4 +++-
.../rocketmq/broker/processor/SendMessageProcessor.java | 3 ++-
.../main/java/com/alibaba/rocketmq/common/BrokerConfig.java | 9 +++++++++
3 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
index 0152b93..1257f18 100644
--- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
@@ -299,9 +299,11 @@ public class PullMessageProcessor implements NettyRequestProcessor {
switch (response.getCode()) {
case ResponseCode.SUCCESS:
+ int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
+ int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
- context.setCommercialRcvTimes(getMessageResult.getMsgCount4Commercial());
+ context.setCommercialRcvTimes(incValue);
context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
context.setCommercialOwner(owner);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
index 414b3f4..a375285 100644
--- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
@@ -428,8 +428,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
sendMessageContext.setQueueId(responseHeader.getQueueId());
sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
+ int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
- int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendTimes(incValue);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
index 6eae0a7..ba80a3f 100644
--- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
@@ -85,6 +85,7 @@ public class BrokerConfig {
private int commercialTimerCount = 1;
private int commercialTransCount = 1;
private int commercialBigCount = 1;
+ private int commercialBaseCount = 1;
private boolean transferMsgByHeap = true;
private int maxDelayTime = 40;
@@ -537,4 +538,12 @@ public class BrokerConfig {
public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) {
this.consumerManageThreadPoolNums = consumerManageThreadPoolNums;
}
+
+ public int getCommercialBaseCount() {
+ return commercialBaseCount;
+ }
+
+ public void setCommercialBaseCount(int commercialBaseCount) {
+ this.commercialBaseCount = commercialBaseCount;
+ }
}
[2/2] incubator-rocketmq git commit: MASTER [ROCKETMQ-14] invoke
callback shoule be invoked in an executor rather than in current thread,
closes apache/incubator-rocketmq#2
Posted by yu...@apache.org.
MASTER [ROCKETMQ-14] invoke callback shoule be invoked in an executor rather than in current thread, closes apache/incubator-rocketmq#2
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/1356e35f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/1356e35f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/1356e35f
Branch: refs/heads/master
Commit: 1356e35f45ebfbca27930fe06e7abd659f111fb4
Parents: 0c022e0
Author: Jaskey <li...@gmail.com>
Authored: Tue Dec 27 17:26:05 2016 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Dec 27 17:26:05 2016 +0800
----------------------------------------------------------------------
.../remoting/netty/NettyRemotingAbstract.java | 67 +++++++++++---------
.../rocketmq/remoting/NettyConnectionTest.java | 52 +++++++++++++++
2 files changed, 88 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1356e35f/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 70ae5b5..1c3fdc5 100644
--- a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -198,35 +198,7 @@ public abstract class NettyRemotingAbstract {
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
- boolean runInThisThread = false;
- ExecutorService executor = this.getCallbackExecutor();
- if (executor != null) {
- try {
- executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- responseFuture.executeInvokeCallback();
- } catch (Throwable e) {
- PLOG.warn("execute callback in executor exception, and callback throw", e);
- }
- }
- });
- } catch (Exception e) {
- runInThisThread = true;
- PLOG.warn("execute callback in executor exception, maybe executor busy", e);
- }
- } else {
- runInThisThread = true;
- }
-
- if (runInThisThread) {
- try {
- responseFuture.executeInvokeCallback();
- } catch (Throwable e) {
- PLOG.warn("executeInvokeCallback Exception", e);
- }
- }
+ executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
}
@@ -236,6 +208,39 @@ public abstract class NettyRemotingAbstract {
}
}
+ //execute callback in callback executor. If callback executor is null, run directly in current thread
+ private void executeInvokeCallback(final ResponseFuture responseFuture) {
+ boolean runInThisThread = false;
+ ExecutorService executor = this.getCallbackExecutor();
+ if (executor != null) {
+ try {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ responseFuture.executeInvokeCallback();
+ } catch (Throwable e) {
+ PLOG.warn("execute callback in executor exception, and callback throw", e);
+ }
+ }
+ });
+ } catch (Exception e) {
+ runInThisThread = true;
+ PLOG.warn("execute callback in executor exception, maybe executor busy", e);
+ }
+ } else {
+ runInThisThread = true;
+ }
+
+ if (runInThisThread) {
+ try {
+ responseFuture.executeInvokeCallback();
+ } catch (Throwable e) {
+ PLOG.warn("executeInvokeCallback Exception", e);
+ }
+ }
+ }
+
public abstract RPCHook getRPCHook();
abstract public ExecutorService getCallbackExecutor();
@@ -257,7 +262,7 @@ public abstract class NettyRemotingAbstract {
for (ResponseFuture rf : rfList) {
try {
- rf.executeInvokeCallback();
+ executeInvokeCallback(rf);
} catch (Throwable e) {
PLOG.warn("scanResponseTable, operationComplete Exception", e);
}
@@ -329,7 +334,7 @@ public abstract class NettyRemotingAbstract {
responseFuture.putResponse(null);
responseTable.remove(opaque);
try {
- responseFuture.executeInvokeCallback();
+ executeInvokeCallback(responseFuture);
} catch (Throwable e) {
PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1356e35f/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
index e4ff948..755d332 100644
--- a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
@@ -22,9 +22,15 @@ import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
+import com.alibaba.rocketmq.remoting.netty.ResponseFuture;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Assert;
import org.junit.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
@@ -51,6 +57,52 @@ public class NettyConnectionTest {
System.out.println("-----------------------------------------------------------------");
}
+
+ @Test
+ public void test_async_timeout() throws InterruptedException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException {
+ RemotingClient client = createRemotingClient();
+ final AtomicInteger ai = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(100);
+ for(int i=0;i<100;i++) {
+ try {
+ RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+ client.invokeAsync("localhost:8888", request, 5, new InvokeCallback() {//very easy to timeout
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ if (responseFuture.isTimeout()) {
+ if(ai.getAndIncrement()==4) {
+ try {
+ System.out.println("First try timeout, blocking 10s" + Thread.currentThread().getName());
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ else{
+ System.out.println("Timeout callback execute,very short."+Thread.currentThread().getName());
+ }
+ }
+ else{
+ System.out.println("Success."+Thread.currentThread().getName());
+ }
+ latch.countDown();
+
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+
+
+ latch.await(1000, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(1, latch.getCount());//only one should be blocked
+ client.shutdown();
+ System.out.println("-----------------------------------------------------------------");
+ }
+
public static RemotingClient createRemotingClient() {
NettyClientConfig config = new NettyClientConfig();
config.setClientChannelMaxIdleTimeSeconds(15);