You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 06:06:50 UTC
[01/10] incubator-rocketmq git commit: ROCKETMQ-2 Closed the selector.
Repository: incubator-rocketmq
Updated Branches:
refs/heads/spec ccdbdf575 -> 13cba1884
ROCKETMQ-2 Closed the selector.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/b5afe91d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/b5afe91d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/b5afe91d
Branch: refs/heads/spec
Commit: b5afe91df86c09187771bd89c28b9f51d69ccf01
Parents: 774101d
Author: shroman <rs...@yahoo.com>
Authored: Sun Dec 25 18:38:12 2016 +0900
Committer: Willem Jiang <wi...@gmail.com>
Committed: Sun Dec 25 20:59:19 2016 +0800
----------------------------------------------------------------------
.../com/alibaba/rocketmq/broker/BrokerControllerTest.java | 9 ++++++++-
.../main/java/com/alibaba/rocketmq/store/ha/HAService.java | 5 +++--
2 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b5afe91d/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
index 6b0b62d..9246d6f 100644
--- a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
@@ -22,11 +22,15 @@ import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
import com.alibaba.rocketmq.store.config.MessageStoreConfig;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author shtykh_roman
*/
public class BrokerControllerTest {
+ protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class);
+
private static final int RESTART_NUM = 3;
/**
@@ -44,10 +48,13 @@ public class BrokerControllerTest {
new NettyClientConfig(), //
new MessageStoreConfig());
boolean initResult = brokerController.initialize();
- System.out.println("initialize " + initResult);
+ logger.info("Broker is initialized " + initResult);
+
brokerController.start();
+ logger.info("Broker is started");
brokerController.shutdown();
+ logger.info("Broker is stopped");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b5afe91d/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
index 5f93753..075252c 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
@@ -173,7 +173,7 @@ public class HAService {
class AcceptSocketService extends ServiceThread {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
- private SocketAddress socketAddressListen;
+ private final SocketAddress socketAddressListen;
public AcceptSocketService(final int port) {
@@ -194,7 +194,8 @@ public class HAService {
public void shutdown(final boolean interrupt) {
super.shutdown(interrupt);
try {
- serverSocketChannel.close();
+ this.serverSocketChannel.close();
+ this.selector.close();
}
catch (IOException e) {
log.error("AcceptSocketService shutdown exception", e);
[06/10] incubator-rocketmq git commit: [ROCKETMQ-3] Remove
try...catch, using assertNull etc
Posted by vi...@apache.org.
[ROCKETMQ-3] Remove try...catch, using assertNull etc
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/626990cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/626990cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/626990cf
Branch: refs/heads/spec
Commit: 626990cf352e4832b2dfc4d96e710d43723849a2
Parents: 03880d7
Author: dongeforever <zh...@yeah.net>
Authored: Mon Dec 26 10:31:45 2016 +0800
Committer: lollipop <lo...@apache.org>
Committed: Tue Dec 27 14:53:17 2016 +0800
----------------------------------------------------------------------
.../java/com/alibaba/rocketmq/broker/BrokerControllerTest.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/626990cf/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
index 9246d6f..b661385 100644
--- a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
@@ -21,6 +21,7 @@ import com.alibaba.rocketmq.common.BrokerConfig;
import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,8 +49,8 @@ public class BrokerControllerTest {
new NettyClientConfig(), //
new MessageStoreConfig());
boolean initResult = brokerController.initialize();
+ Assert.assertTrue(initResult);
logger.info("Broker is initialized " + initResult);
-
brokerController.start();
logger.info("Broker is started");
[08/10] incubator-rocketmq git commit: MASTER [ROCKETMQ-14] invoke
callback shoule be invoked in an executor rather than in current thread,
closes apache/incubator-rocketmq#2
Posted by vi...@apache.org.
MASTER [ROCKETMQ-14] invoke callback shoule be invoked in an executor rather than in current thread, closes apache/incubator-rocketmq#2
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/1356e35f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/1356e35f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/1356e35f
Branch: refs/heads/spec
Commit: 1356e35f45ebfbca27930fe06e7abd659f111fb4
Parents: 0c022e0
Author: Jaskey <li...@gmail.com>
Authored: Tue Dec 27 17:26:05 2016 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Dec 27 17:26:05 2016 +0800
----------------------------------------------------------------------
.../remoting/netty/NettyRemotingAbstract.java | 67 +++++++++++---------
.../rocketmq/remoting/NettyConnectionTest.java | 52 +++++++++++++++
2 files changed, 88 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1356e35f/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 70ae5b5..1c3fdc5 100644
--- a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -198,35 +198,7 @@ public abstract class NettyRemotingAbstract {
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
- boolean runInThisThread = false;
- ExecutorService executor = this.getCallbackExecutor();
- if (executor != null) {
- try {
- executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- responseFuture.executeInvokeCallback();
- } catch (Throwable e) {
- PLOG.warn("execute callback in executor exception, and callback throw", e);
- }
- }
- });
- } catch (Exception e) {
- runInThisThread = true;
- PLOG.warn("execute callback in executor exception, maybe executor busy", e);
- }
- } else {
- runInThisThread = true;
- }
-
- if (runInThisThread) {
- try {
- responseFuture.executeInvokeCallback();
- } catch (Throwable e) {
- PLOG.warn("executeInvokeCallback Exception", e);
- }
- }
+ executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
}
@@ -236,6 +208,39 @@ public abstract class NettyRemotingAbstract {
}
}
+ //execute callback in callback executor. If callback executor is null, run directly in current thread
+ private void executeInvokeCallback(final ResponseFuture responseFuture) {
+ boolean runInThisThread = false;
+ ExecutorService executor = this.getCallbackExecutor();
+ if (executor != null) {
+ try {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ responseFuture.executeInvokeCallback();
+ } catch (Throwable e) {
+ PLOG.warn("execute callback in executor exception, and callback throw", e);
+ }
+ }
+ });
+ } catch (Exception e) {
+ runInThisThread = true;
+ PLOG.warn("execute callback in executor exception, maybe executor busy", e);
+ }
+ } else {
+ runInThisThread = true;
+ }
+
+ if (runInThisThread) {
+ try {
+ responseFuture.executeInvokeCallback();
+ } catch (Throwable e) {
+ PLOG.warn("executeInvokeCallback Exception", e);
+ }
+ }
+ }
+
public abstract RPCHook getRPCHook();
abstract public ExecutorService getCallbackExecutor();
@@ -257,7 +262,7 @@ public abstract class NettyRemotingAbstract {
for (ResponseFuture rf : rfList) {
try {
- rf.executeInvokeCallback();
+ executeInvokeCallback(rf);
} catch (Throwable e) {
PLOG.warn("scanResponseTable, operationComplete Exception", e);
}
@@ -329,7 +334,7 @@ public abstract class NettyRemotingAbstract {
responseFuture.putResponse(null);
responseTable.remove(opaque);
try {
- responseFuture.executeInvokeCallback();
+ executeInvokeCallback(responseFuture);
} catch (Throwable e) {
PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1356e35f/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
index e4ff948..755d332 100644
--- a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
@@ -22,9 +22,15 @@ import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
+import com.alibaba.rocketmq.remoting.netty.ResponseFuture;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Assert;
import org.junit.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
@@ -51,6 +57,52 @@ public class NettyConnectionTest {
System.out.println("-----------------------------------------------------------------");
}
+
+ @Test
+ public void test_async_timeout() throws InterruptedException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException {
+ RemotingClient client = createRemotingClient();
+ final AtomicInteger ai = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(100);
+ for(int i=0;i<100;i++) {
+ try {
+ RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+ client.invokeAsync("localhost:8888", request, 5, new InvokeCallback() {//very easy to timeout
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ if (responseFuture.isTimeout()) {
+ if(ai.getAndIncrement()==4) {
+ try {
+ System.out.println("First try timeout, blocking 10s" + Thread.currentThread().getName());
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ else{
+ System.out.println("Timeout callback execute,very short."+Thread.currentThread().getName());
+ }
+ }
+ else{
+ System.out.println("Success."+Thread.currentThread().getName());
+ }
+ latch.countDown();
+
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+
+
+ latch.await(1000, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(1, latch.getCount());//only one should be blocked
+ client.shutdown();
+ System.out.println("-----------------------------------------------------------------");
+ }
+
public static RemotingClient createRemotingClient() {
NettyClientConfig config = new NettyClientConfig();
config.setClientChannelMaxIdleTimeSeconds(15);
[02/10] incubator-rocketmq git commit: ROCKETMQ-3 Clean up and
perfect the unit test with thanks to zander
Posted by vi...@apache.org.
ROCKETMQ-3 Clean up and perfect the unit test with thanks to zander
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/03880d77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/03880d77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/03880d77
Branch: refs/heads/spec
Commit: 03880d77a984403170802e9f32a568e57347e3ab
Parents: b5afe91
Author: Willem Jiang <wi...@gmail.com>
Authored: Mon Dec 26 10:38:39 2016 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Dec 26 10:38:39 2016 +0800
----------------------------------------------------------------------
.../rocketmq/broker/api/SendMessageTest.java | 32 +++++++++-----------
.../broker/topic/TopicConfigManagerTest.java | 12 ++++----
2 files changed, 20 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03880d77/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
index d9babc2..cf97876 100644
--- a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
@@ -63,25 +63,21 @@ public class SendMessageTest extends BrokerTestHarness{
}
@Test
- public void testSendSingle() throws Exception {
+ public void testSendSingle() throws Exception{
Message msg = new Message(topic, "TAG1 TAG2", "100200300", "body".getBytes());
- try {
- SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
- requestHeader.setProducerGroup("abc");
- requestHeader.setTopic(msg.getTopic());
- requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
- requestHeader.setDefaultTopicQueueNums(4);
- requestHeader.setQueueId(0);
- requestHeader.setSysFlag(0);
- requestHeader.setBornTimestamp(System.currentTimeMillis());
- requestHeader.setFlag(msg.getFlag());
- requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
+ SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+ requestHeader.setProducerGroup("abc");
+ requestHeader.setTopic(msg.getTopic());
+ requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
+ requestHeader.setDefaultTopicQueueNums(4);
+ requestHeader.setQueueId(0);
+ requestHeader.setSysFlag(0);
+ requestHeader.setBornTimestamp(System.currentTimeMillis());
+ requestHeader.setFlag(msg.getFlag());
+ requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
- SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5,
- CommunicationMode.SYNC, new SendMessageContext(), null);
- assertTrue(result.getSendStatus() == SendStatus.SEND_OK);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5,
+ CommunicationMode.SYNC, new SendMessageContext(), null);
+ assertEquals(result.getSendStatus(), SendStatus.SEND_OK);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03880d77/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
index 7a6503f..1c93b02 100644
--- a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -25,7 +25,7 @@ import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.TopicConfig;
import org.junit.Test;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
/**
@@ -39,7 +39,7 @@ public class TopicConfigManagerTest extends BrokerTestHarness {
for (int i = 0; i < 10; i++) {
String topic = "UNITTEST-" + i;
TopicConfig topicConfig = topicConfigManager.createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0);
- assertTrue(topicConfig != null);
+ assertNotNull(topicConfig);
}
topicConfigManager.persist();
@@ -48,15 +48,15 @@ public class TopicConfigManagerTest extends BrokerTestHarness {
for (int i = 0; i < 10; i++) {
String topic = "UNITTEST-" + i;
TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
- assertTrue(topicConfig == null);
+ assertNull(topicConfig);
}
topicConfigManager.load();
for (int i = 0; i < 10; i++) {
String topic = "UNITTEST-" + i;
TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
- assertTrue(topicConfig != null);
- assertTrue(topicConfig.getTopicSysFlag() == 0);
- assertTrue(topicConfig.getReadQueueNums() == 4);
+ assertNotNull(topicConfig);
+ assertEquals(topicConfig.getTopicSysFlag(), 0);
+ assertEquals(topicConfig.getReadQueueNums(), 4);
}
}
}
[10/10] incubator-rocketmq git commit: [ROCKETMQ-17] Develop a
vendor-neutral open standard for distributed messaging: add producer sample
https://issues.apache.org/jira/browse/ROCKETMQ-17
Posted by vi...@apache.org.
[ROCKETMQ-17] Develop a vendor-neutral open standard for distributed messaging: add producer sample
https://issues.apache.org/jira/browse/ROCKETMQ-17
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/13cba188
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/13cba188
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/13cba188
Branch: refs/heads/spec
Commit: 13cba18842604067e20d6c1048f83e614e4ac949
Parents: 3504967
Author: vintagewang <vi...@apache.org>
Authored: Wed Dec 28 14:06:14 2016 +0800
Committer: vintagewang <vi...@apache.org>
Committed: Wed Dec 28 14:06:14 2016 +0800
----------------------------------------------------------------------
.../apache/openmessaging/MessagingEndPoint.java | 2 +
.../openmessaging/MessagingEndPointManager.java | 4 +-
.../internal/MessagingEndPointFactory.java | 11 ++---
.../messaging-user-level-samples/java/pom.xml | 27 +++++++++++++
.../apache/openmessaging/samples/Producer.java | 42 ++++++++++++++++++++
spec/code/pom.xml | 6 +++
6 files changed, 85 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13cba188/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPoint.java
----------------------------------------------------------------------
diff --git a/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPoint.java b/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPoint.java
index f06d7e5..f90cb62 100644
--- a/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPoint.java
+++ b/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPoint.java
@@ -30,4 +30,6 @@ public interface MessagingEndPoint {
PushConsumer createPushConsumer();
PullConsumer createPullConsumer();
+
+ BytesMessage createBytesMessage(final String topic, final byte[] body);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13cba188/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPointManager.java
----------------------------------------------------------------------
diff --git a/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPointManager.java b/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPointManager.java
index 5635893..0ecd870 100644
--- a/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPointManager.java
+++ b/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPointManager.java
@@ -28,11 +28,11 @@ import java.util.Properties;
* @author vintagewang@apache.org
*/
public class MessagingEndPointManager {
- public static MessagingEndPoint getMessagingEndPoint(String url) throws Exception {
+ public static MessagingEndPoint getMessagingEndPoint(String url) {
return getMessagingEndPoint(url, new Properties());
}
- public static MessagingEndPoint getMessagingEndPoint(String url, Properties properties) throws Exception {
+ public static MessagingEndPoint getMessagingEndPoint(String url, Properties properties) {
Map<String, List<String>> driverUrl = URISpecParser.parseURI(url);
if (null == driverUrl || driverUrl.size() == 0) {
throw new IllegalArgumentException("driver url parsed result.size ==0");
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13cba188/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/internal/MessagingEndPointFactory.java
----------------------------------------------------------------------
diff --git a/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/internal/MessagingEndPointFactory.java b/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/internal/MessagingEndPointFactory.java
index 7d521ff..1dbc71d 100644
--- a/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/internal/MessagingEndPointFactory.java
+++ b/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/internal/MessagingEndPointFactory.java
@@ -19,7 +19,6 @@ package org.apache.openmessaging.internal;
import org.apache.openmessaging.MessagingEndPoint;
-import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -28,13 +27,15 @@ import java.util.Properties;
* @author vintagewang@apache.org
*/
public class MessagingEndPointFactory {
- public static MessagingEndPoint createMessagingEndPoint(Map<String, List<String>> url, Properties properties)
- throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException,
- InstantiationException, IllegalAccessException {
+ public static MessagingEndPoint createMessagingEndPoint(Map<String, List<String>> url, Properties properties) {
List<String> driver = url.get(ServiceConstants.SPI_NAME);
List<String> urls = url.get(ServiceConstants.URL_NAME);
if (urls != null && urls.size() > 0)
properties.put(ServiceConstants.URL, urls.get(0));
- return MessagingEndPointAdapter.instantiateMessagingEndPoint(driver.get(0), properties);
+ try {
+ return MessagingEndPointAdapter.instantiateMessagingEndPoint(driver.get(0), properties);
+ } catch (Exception e) {
+ throw new RuntimeException("createMessagingEndPoint exception", e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13cba188/spec/code/messaging-user-level-samples/java/pom.xml
----------------------------------------------------------------------
diff --git a/spec/code/messaging-user-level-samples/java/pom.xml b/spec/code/messaging-user-level-samples/java/pom.xml
new file mode 100644
index 0000000..67f4151
--- /dev/null
+++ b/spec/code/messaging-user-level-samples/java/pom.xml
@@ -0,0 +1,27 @@
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>open-standard-all</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>messaging-user-level-samples</artifactId>
+ <name>messaging-user-level-samples ${project.version}</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>messaging-user-level-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13cba188/spec/code/messaging-user-level-samples/java/src/main/java/org/apache/openmessaging/samples/Producer.java
----------------------------------------------------------------------
diff --git a/spec/code/messaging-user-level-samples/java/src/main/java/org/apache/openmessaging/samples/Producer.java b/spec/code/messaging-user-level-samples/java/src/main/java/org/apache/openmessaging/samples/Producer.java
new file mode 100644
index 0000000..30a0d78
--- /dev/null
+++ b/spec/code/messaging-user-level-samples/java/src/main/java/org/apache/openmessaging/samples/Producer.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openmessaging.samples;
+
+
+import org.apache.openmessaging.MessagingEndPoint;
+import org.apache.openmessaging.MessagingEndPointManager;
+
+import java.nio.charset.Charset;
+
+public class Producer {
+ public static void main(String[] args) {
+ final MessagingEndPoint messagingEndPoint = MessagingEndPointManager.getMessagingEndPoint("openmessaging:rocketmq://localhost:10911/namespace");
+
+ final org.apache.openmessaging.Producer producer = messagingEndPoint.createProducer();
+
+ producer.start();
+
+ producer.send(messagingEndPoint.createBytesMessage("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ producer.shutdown();
+ }
+ }));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13cba188/spec/code/pom.xml
----------------------------------------------------------------------
diff --git a/spec/code/pom.xml b/spec/code/pom.xml
index fe83fc5..097431b 100644
--- a/spec/code/pom.xml
+++ b/spec/code/pom.xml
@@ -11,6 +11,7 @@
<modules>
<module>messaging-user-level-api/java</module>
+ <module>messaging-user-level-samples/java</module>
<module>messaging-wire-level-api</module>
</modules>
@@ -86,6 +87,11 @@
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>messaging-user-level-samples</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>messaging-user-level-api</artifactId>
<version>${project.version}</version>
</dependency>
[05/10] incubator-rocketmq git commit: [ROCKETMQ-9] Clean up the code
Posted by vi...@apache.org.
[ROCKETMQ-9] Clean up the code
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/e5892e16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/e5892e16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/e5892e16
Branch: refs/heads/spec
Commit: e5892e164d469acf687c1cb3e598c6c44f9ef1c1
Parents: d1fa869
Author: Willem Jiang <wi...@gmail.com>
Authored: Tue Dec 27 13:33:43 2016 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Dec 27 13:33:43 2016 +0800
----------------------------------------------------------------------
.../main/java/com/alibaba/rocketmq/store/MappedFileQueue.java | 6 +++---
.../java/com/alibaba/rocketmq/store/index/IndexService.java | 4 ++--
rocketmq-store/src/test/resources/logback-test.xml | 1 +
3 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e5892e16/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
index 8d9d3ab..0d15ece 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
@@ -484,10 +484,10 @@ public class MappedFileQueue {
try {
return this.mappedFiles.get(index);
} catch (Exception e) {
- if (returnFirstOnNotFound)
+ if (returnFirstOnNotFound) {
return mappedFile;
-
- LOG_ERROR.warn("findMappedFileByOffset failure. {}", UtilAll.currentStackTrace());
+ }
+ LOG_ERROR.warn("findMappedFileByOffset failure. ", e);
}
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e5892e16/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
index fded747..f4f27bc 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
@@ -279,7 +279,7 @@ public class IndexService {
*
* @return {@link IndexFile} or null on failure.
*/
- private IndexFile retryGetAndCreateIndexFile() {
+ public IndexFile retryGetAndCreateIndexFile() {
IndexFile indexFile = null;
for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
@@ -288,7 +288,7 @@ public class IndexService {
break;
try {
- log.error("Tried to create index file " + times + " times");
+ log.info("Tried to create index file " + times + " times");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e5892e16/rocketmq-store/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/resources/logback-test.xml b/rocketmq-store/src/test/resources/logback-test.xml
index 11d429d..6754c0a 100644
--- a/rocketmq-store/src/test/resources/logback-test.xml
+++ b/rocketmq-store/src/test/resources/logback-test.xml
@@ -20,6 +20,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n</pattern>
+ <charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
[07/10] incubator-rocketmq git commit: Allow setting base factor for
commercial data.
Posted by vi...@apache.org.
Allow setting base factor for commercial data.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/0c022e05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/0c022e05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/0c022e05
Branch: refs/heads/spec
Commit: 0c022e05af86fa53c71bcb92b93a8a2b6fb82908
Parents: e5892e1
Author: yukon <yu...@apache.org>
Authored: Tue Dec 27 16:38:25 2016 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Dec 27 16:38:25 2016 +0800
----------------------------------------------------------------------
.../rocketmq/broker/processor/PullMessageProcessor.java | 4 +++-
.../rocketmq/broker/processor/SendMessageProcessor.java | 3 ++-
.../main/java/com/alibaba/rocketmq/common/BrokerConfig.java | 9 +++++++++
3 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
index 0152b93..1257f18 100644
--- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
@@ -299,9 +299,11 @@ public class PullMessageProcessor implements NettyRequestProcessor {
switch (response.getCode()) {
case ResponseCode.SUCCESS:
+ int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
+ int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
- context.setCommercialRcvTimes(getMessageResult.getMsgCount4Commercial());
+ context.setCommercialRcvTimes(incValue);
context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
context.setCommercialOwner(owner);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
index 414b3f4..a375285 100644
--- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
@@ -428,8 +428,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
sendMessageContext.setQueueId(responseHeader.getQueueId());
sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
+ int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
- int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendTimes(incValue);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
index 6eae0a7..ba80a3f 100644
--- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
@@ -85,6 +85,7 @@ public class BrokerConfig {
private int commercialTimerCount = 1;
private int commercialTransCount = 1;
private int commercialBigCount = 1;
+ private int commercialBaseCount = 1;
private boolean transferMsgByHeap = true;
private int maxDelayTime = 40;
@@ -537,4 +538,12 @@ public class BrokerConfig {
public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) {
this.consumerManageThreadPoolNums = consumerManageThreadPoolNums;
}
+
+ public int getCommercialBaseCount() {
+ return commercialBaseCount;
+ }
+
+ public void setCommercialBaseCount(int commercialBaseCount) {
+ this.commercialBaseCount = commercialBaseCount;
+ }
}
[03/10] incubator-rocketmq git commit: [ROCKETMQ-13] Wrong log level
for AcceptSocketService termination.
Posted by vi...@apache.org.
[ROCKETMQ-13] Wrong log level for AcceptSocketService termination.
Additionally, added code comments and did a cleanup.
JIRA issue: https://issues.apache.org/jira/browse/ROCKETMQ-13
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/fed09763
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/fed09763
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/fed09763
Branch: refs/heads/spec
Commit: fed09763bccb73696d1953937b8b1eb1ce88b131
Parents: 626990c
Author: shtykh_roman <rs...@yahoo.com>
Authored: Mon Dec 26 12:17:53 2016 +0900
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Dec 26 15:12:41 2016 +0800
----------------------------------------------------------------------
.../alibaba/rocketmq/store/ha/HAService.java | 26 +++++++++++++-------
1 file changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/fed09763/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
index 075252c..2cf695c 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
@@ -46,7 +46,7 @@ public class HAService {
private final AtomicInteger connectionCount = new AtomicInteger(0);
- private final List<HAConnection> connectionList = new LinkedList<HAConnection>();
+ private final List<HAConnection> connectionList = new LinkedList<>();
private final AcceptSocketService acceptSocketService;
@@ -170,17 +170,22 @@ public class HAService {
return push2SlaveMaxOffset;
}
+ /**
+ * Listens to slave connections to create {@link HAConnection}.
+ */
class AcceptSocketService extends ServiceThread {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private final SocketAddress socketAddressListen;
-
public AcceptSocketService(final int port) {
this.socketAddressListen = new InetSocketAddress(port);
}
-
+ /**
+ * Starts listening to slave connections.
+ * @throws Exception If fails.
+ */
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
@@ -190,6 +195,7 @@ public class HAService {
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
+ /** {@inheritDoc} */
@Override
public void shutdown(final boolean interrupt) {
super.shutdown(interrupt);
@@ -202,6 +208,7 @@ public class HAService {
}
}
+ /** {@inheritDoc} */
@Override
public void run() {
log.info(this.getServiceName() + " service started");
@@ -210,10 +217,12 @@ public class HAService {
try {
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
+
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
+
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
@@ -234,16 +243,15 @@ public class HAService {
selected.clear();
}
-
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
- log.error(this.getServiceName() + " service end");
+ log.info(this.getServiceName() + " service end");
}
-
+ /** {@inheritDoc} */
@Override
public String getServiceName() {
return AcceptSocketService.class.getSimpleName();
@@ -256,8 +264,8 @@ public class HAService {
class GroupTransferService extends ServiceThread {
private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
- private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
- private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
+ private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<>();
+ private volatile List<GroupCommitRequest> requestsRead = new ArrayList<>();
public void putRequest(final GroupCommitRequest request) {
@@ -333,7 +341,7 @@ public class HAService {
class HAClient extends ServiceThread {
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
- private final AtomicReference<String> masterAddress = new AtomicReference<String>();
+ private final AtomicReference<String> masterAddress = new AtomicReference<>();
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
private SocketChannel socketChannel;
private Selector selector;
[09/10] incubator-rocketmq git commit: Merge branch 'master' into spec
Posted by vi...@apache.org.
Merge branch 'master' into spec
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/35049678
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/35049678
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/35049678
Branch: refs/heads/spec
Commit: 35049678309969d57bcc5c7c116076ee4698bd72
Parents: ccdbdf5 1356e35
Author: vintagewang <vi...@apache.org>
Authored: Wed Dec 28 13:39:58 2016 +0800
Committer: vintagewang <vi...@apache.org>
Committed: Wed Dec 28 13:39:58 2016 +0800
----------------------------------------------------------------------
.../broker/processor/PullMessageProcessor.java | 4 +-
.../broker/processor/SendMessageProcessor.java | 3 +-
.../rocketmq/broker/BrokerControllerTest.java | 10 ++-
.../rocketmq/broker/api/SendMessageTest.java | 32 ++++------
.../broker/topic/TopicConfigManagerTest.java | 12 ++--
.../alibaba/rocketmq/common/BrokerConfig.java | 9 +++
.../remoting/netty/NettyRemotingAbstract.java | 67 +++++++++++---------
.../rocketmq/remoting/NettyConnectionTest.java | 52 +++++++++++++++
.../store/AllocateMappedFileService.java | 2 +-
.../alibaba/rocketmq/store/MappedFileQueue.java | 22 +++++--
.../alibaba/rocketmq/store/ha/HAService.java | 31 +++++----
.../alibaba/rocketmq/store/index/IndexFile.java | 5 +-
.../rocketmq/store/index/IndexService.java | 29 +++++----
.../rocketmq/store/MappedFileQueueTest.java | 17 ++++-
.../rocketmq/store/index/IndexFileTest.java | 18 ++++--
.../src/test/resources/logback-test.xml | 3 +-
16 files changed, 212 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
[04/10] incubator-rocketmq git commit: [ROCKETMQ-9] Errors in
rocketmq-store module.
Posted by vi...@apache.org.
[ROCKETMQ-9] Errors in rocketmq-store module.
JIRA issue: https://issues.apache.org/jira/browse/ROCKETMQ-9
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/d1fa8694
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/d1fa8694
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/d1fa8694
Branch: refs/heads/spec
Commit: d1fa8694ff82b429dcd811156edf7ea8a702237e
Parents: fed0976
Author: shtykh_roman <rs...@yahoo.com>
Authored: Mon Dec 26 18:32:00 2016 +0900
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Dec 27 09:50:23 2016 +0800
----------------------------------------------------------------------
.../store/AllocateMappedFileService.java | 2 +-
.../alibaba/rocketmq/store/MappedFileQueue.java | 26 ++++++++++------
.../alibaba/rocketmq/store/index/IndexFile.java | 5 ++--
.../rocketmq/store/index/IndexService.java | 31 +++++++++++---------
.../rocketmq/store/MappedFileQueueTest.java | 17 +++++++++--
.../rocketmq/store/index/IndexFileTest.java | 18 +++++++-----
.../src/test/resources/logback-test.xml | 2 +-
7 files changed, 63 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
index 40eee7a..06113c8 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
@@ -213,7 +213,7 @@ public class AllocateMappedFileService extends ServiceThread {
isSuccess = true;
}
} catch (InterruptedException e) {
- log.warn(this.getServiceName() + " service has exception, maybe by shutdown");
+ log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
index 2b006c0..8d9d3ab 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
@@ -459,27 +459,35 @@ public class MappedFileQueue {
return result;
}
-
+ /**
+ * Finds a mapped file by offset.
+ *
+ * @param offset Offset.
+ * @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
+ * @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
+ */
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile mappedFile = this.getFirstMappedFile();
if (mappedFile != null) {
int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
if (index < 0 || index >= this.mappedFiles.size()) {
- LOG_ERROR.warn("findMappedFileByOffset offset not matched, request Offset: {}, index: {}, mappedFileSize: {}, mappedFiles count: {}, StackTrace: {}",
- offset,
- index,
- this.mappedFileSize,
- this.mappedFiles.size(),
- UtilAll.currentStackTrace());
+ LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " +
+ "mappedFileSize: {}, mappedFiles count: {}",
+ mappedFile,
+ offset,
+ index,
+ this.mappedFileSize,
+ this.mappedFiles.size());
}
try {
return this.mappedFiles.get(index);
} catch (Exception e) {
- if (returnFirstOnNotFound) {
+ if (returnFirstOnNotFound)
return mappedFile;
- }
+
+ LOG_ERROR.warn("findMappedFileByOffset failure. {}", UtilAll.currentStackTrace());
}
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
index f353320..befa5f9 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
@@ -94,7 +94,6 @@ public class IndexFile {
return this.indexHeader.getIndexCount() >= this.indexNum;
}
-
public boolean destroy(final long intervalForcibly) {
return this.mappedFile.destroy(intervalForcibly);
}
@@ -167,8 +166,8 @@ public class IndexFile {
}
}
} else {
- log.warn("putKey index count " + this.indexHeader.getIndexCount() + " index max num "
- + this.indexNum);
+ log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ + "; index max num = " + this.indexNum);
}
return false;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
index f275f80..fded747 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
@@ -49,6 +49,9 @@ public class IndexService {
private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ /** Maximum times to attempt index file creation. */
+ private static final int MAX_TRY_IDX_CREATE = 3;
+
public IndexService(final DefaultMessageStore store) {
this.defaultMessageStore = store;
@@ -257,44 +260,44 @@ public class IndexService {
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
- for (boolean ok =
- indexFile.putKey(idxKey, msg.getCommitLogOffset(),
- msg.getStoreTimestamp()); !ok; ) {
- log.warn("index file full, so create another one, " + indexFile.getFileName());
+ for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
+ log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
+
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}
- ok =
- indexFile.putKey(idxKey, msg.getCommitLogOffset(),
- msg.getStoreTimestamp());
+ ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
+
return indexFile;
}
-
- public IndexFile retryGetAndCreateIndexFile() {
+ /**
+ * Retries to get or create index file.
+ *
+ * @return {@link IndexFile} or null on failure.
+ */
+ private IndexFile retryGetAndCreateIndexFile() {
IndexFile indexFile = null;
-
- for (int times = 0; null == indexFile && times < 3; times++) {
+ for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
indexFile = this.getAndCreateLastIndexFile();
if (null != indexFile)
break;
try {
- log.error("try to create index file, " + times + " times");
+ log.error("Tried to create index file " + times + " times");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
-
if (null == indexFile) {
this.defaultMessageStore.getAccessRights().makeIndexFileError();
- log.error("mark index file can not build flag");
+ log.error("Mark index file cannot build flag");
}
return indexFile;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
index 89b37be..700f1c6 100644
--- a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
@@ -52,6 +52,7 @@ public class MappedFileQueueTest {
@Test
public void test_getLastMapedFile() {
final String fixedMsg = "0123456789abcdef";
+
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/a/", 1024, null);
@@ -59,6 +60,7 @@ public class MappedFileQueueTest {
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertTrue(mappedFile != null);
+
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
if (!result) {
logger.debug("appendMessage " + i);
@@ -74,7 +76,9 @@ public class MappedFileQueueTest {
@Test
public void test_findMapedFileByOffset() {
+ // four-byte string.
final String fixedMsg = "abcd";
+
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/b/", 1024, null);
@@ -82,11 +86,13 @@ public class MappedFileQueueTest {
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertTrue(mappedFile != null);
+
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
- // logger.debug("appendMessage " + bytes);
assertTrue(result);
}
+ assertEquals(fixedMsg.getBytes().length * 1024, mappedFileQueue.getMappedMemorySize());
+
MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 0);
@@ -110,7 +116,8 @@ public class MappedFileQueueTest {
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
-
+
+ // over mapped memory size.
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4);
assertTrue(mappedFile == null);
@@ -125,6 +132,7 @@ public class MappedFileQueueTest {
@Test
public void test_commit() {
final String fixedMsg = "0123456789abcdef";
+
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/c/", 1024, null);
@@ -132,6 +140,7 @@ public class MappedFileQueueTest {
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertTrue(mappedFile != null);
+
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
assertTrue(result);
}
@@ -168,6 +177,7 @@ public class MappedFileQueueTest {
@Test
public void test_getMapedMemorySize() {
final String fixedMsg = "abcd";
+
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/d/", 1024, null);
@@ -175,14 +185,15 @@ public class MappedFileQueueTest {
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertTrue(mappedFile != null);
+
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
assertTrue(result);
}
assertEquals(fixedMsg.length() * 1024, mappedFileQueue.getMappedMemorySize());
+
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
logger.debug("MappedFileQueue.getMappedMemorySize() OK");
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
index f6bfc0a..9e446f7 100644
--- a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
@@ -31,17 +31,18 @@ import static org.junit.Assert.assertTrue;
public class IndexFileTest {
- private static final int hashSlotNum = 100;
- private static final int indexNum = 400;
+ private static final int HASH_SLOT_NUM = 100;
+ private static final int INDEX_NUM = 400;
@Test
public void test_put_index() throws Exception {
- IndexFile indexFile = new IndexFile("100", hashSlotNum, indexNum, 0, 0);
- for (long i = 0; i < (indexNum - 1); i++) {
+ IndexFile indexFile = new IndexFile("100", HASH_SLOT_NUM, INDEX_NUM, 0, 0);
+ for (long i = 0; i < (INDEX_NUM - 1); i++) {
boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
assertTrue(putResult);
}
-
+
+ // put over index file capacity.
boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
assertFalse(putResult);
@@ -51,12 +52,14 @@ public class IndexFileTest {
@Test
public void test_put_get_index() throws Exception {
- IndexFile indexFile = new IndexFile("200", hashSlotNum, indexNum, 0, 0);
+ IndexFile indexFile = new IndexFile("200", HASH_SLOT_NUM, INDEX_NUM, 0, 0);
- for (long i = 0; i < (indexNum - 1); i++) {
+ for (long i = 0; i < (INDEX_NUM - 1); i++) {
boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
assertTrue(putResult);
}
+
+ // put over index file capacity.
boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
assertFalse(putResult);
@@ -64,6 +67,7 @@ public class IndexFileTest {
indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true);
assertFalse(phyOffsets.isEmpty());
assertEquals(1, phyOffsets.size());
+
indexFile.destroy(0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/resources/logback-test.xml b/rocketmq-store/src/test/resources/logback-test.xml
index acdfa10..11d429d 100644
--- a/rocketmq-store/src/test/resources/logback-test.xml
+++ b/rocketmq-store/src/test/resources/logback-test.xml
@@ -27,7 +27,7 @@
<appender-ref ref="STDOUT" />
</logger>
- <root level="WARN">
+ <root level="ERROR">
<appender-ref ref="STDOUT" />
</root>