You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 06:06:50 UTC

[01/10] incubator-rocketmq git commit: ROCKETMQ-2 Closed the selector.

Repository: incubator-rocketmq
Updated Branches:
  refs/heads/spec ccdbdf575 -> 13cba1884


ROCKETMQ-2 Closed the selector.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/b5afe91d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/b5afe91d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/b5afe91d

Branch: refs/heads/spec
Commit: b5afe91df86c09187771bd89c28b9f51d69ccf01
Parents: 774101d
Author: shroman <rs...@yahoo.com>
Authored: Sun Dec 25 18:38:12 2016 +0900
Committer: Willem Jiang <wi...@gmail.com>
Committed: Sun Dec 25 20:59:19 2016 +0800

----------------------------------------------------------------------
 .../com/alibaba/rocketmq/broker/BrokerControllerTest.java   | 9 ++++++++-
 .../main/java/com/alibaba/rocketmq/store/ha/HAService.java  | 5 +++--
 2 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b5afe91d/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
index 6b0b62d..9246d6f 100644
--- a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
@@ -22,11 +22,15 @@ import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
 import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
 import com.alibaba.rocketmq.store.config.MessageStoreConfig;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author shtykh_roman
  */
 public class BrokerControllerTest {
+    protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class);
+
     private static final int RESTART_NUM = 3;
 
     /**
@@ -44,10 +48,13 @@ public class BrokerControllerTest {
                 new NettyClientConfig(), //
                 new MessageStoreConfig());
             boolean initResult = brokerController.initialize();
-            System.out.println("initialize " + initResult);
+            logger.info("Broker is initialized " + initResult);
+
             brokerController.start();
+            logger.info("Broker is started");
 
             brokerController.shutdown();
+            logger.info("Broker is stopped");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b5afe91d/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
index 5f93753..075252c 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
@@ -173,7 +173,7 @@ public class HAService {
     class AcceptSocketService extends ServiceThread {
         private ServerSocketChannel serverSocketChannel;
         private Selector selector;
-        private SocketAddress socketAddressListen;
+        private final SocketAddress socketAddressListen;
 
 
         public AcceptSocketService(final int port) {
@@ -194,7 +194,8 @@ public class HAService {
         public void shutdown(final boolean interrupt) {
             super.shutdown(interrupt);
             try {
-                serverSocketChannel.close();
+                this.serverSocketChannel.close();
+                this.selector.close();
             }
             catch (IOException e) {
                 log.error("AcceptSocketService shutdown exception", e);


[06/10] incubator-rocketmq git commit: [ROCKETMQ-3] Remove try...catch, using assertNull etc

Posted by vi...@apache.org.
[ROCKETMQ-3] Remove try...catch, using assertNull etc


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/626990cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/626990cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/626990cf

Branch: refs/heads/spec
Commit: 626990cf352e4832b2dfc4d96e710d43723849a2
Parents: 03880d7
Author: dongeforever <zh...@yeah.net>
Authored: Mon Dec 26 10:31:45 2016 +0800
Committer: lollipop <lo...@apache.org>
Committed: Tue Dec 27 14:53:17 2016 +0800

----------------------------------------------------------------------
 .../java/com/alibaba/rocketmq/broker/BrokerControllerTest.java    | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/626990cf/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
index 9246d6f..b661385 100644
--- a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
@@ -21,6 +21,7 @@ import com.alibaba.rocketmq.common.BrokerConfig;
 import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
 import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
 import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,8 +49,8 @@ public class BrokerControllerTest {
                 new NettyClientConfig(), //
                 new MessageStoreConfig());
             boolean initResult = brokerController.initialize();
+            Assert.assertTrue(initResult);
             logger.info("Broker is initialized " + initResult);
-
             brokerController.start();
             logger.info("Broker is started");
 


[08/10] incubator-rocketmq git commit: MASTER [ROCKETMQ-14] invoke callback shoule be invoked in an executor rather than in current thread, closes apache/incubator-rocketmq#2

Posted by vi...@apache.org.
MASTER [ROCKETMQ-14] invoke callback shoule be invoked in an executor rather than in current thread, closes apache/incubator-rocketmq#2


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/1356e35f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/1356e35f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/1356e35f

Branch: refs/heads/spec
Commit: 1356e35f45ebfbca27930fe06e7abd659f111fb4
Parents: 0c022e0
Author: Jaskey <li...@gmail.com>
Authored: Tue Dec 27 17:26:05 2016 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Dec 27 17:26:05 2016 +0800

----------------------------------------------------------------------
 .../remoting/netty/NettyRemotingAbstract.java   | 67 +++++++++++---------
 .../rocketmq/remoting/NettyConnectionTest.java  | 52 +++++++++++++++
 2 files changed, 88 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1356e35f/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 70ae5b5..1c3fdc5 100644
--- a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -198,35 +198,7 @@ public abstract class NettyRemotingAbstract {
             responseTable.remove(opaque);
 
             if (responseFuture.getInvokeCallback() != null) {
-                boolean runInThisThread = false;
-                ExecutorService executor = this.getCallbackExecutor();
-                if (executor != null) {
-                    try {
-                        executor.submit(new Runnable() {
-                            @Override
-                            public void run() {
-                                try {
-                                    responseFuture.executeInvokeCallback();
-                                } catch (Throwable e) {
-                                    PLOG.warn("execute callback in executor exception, and callback throw", e);
-                                }
-                            }
-                        });
-                    } catch (Exception e) {
-                        runInThisThread = true;
-                        PLOG.warn("execute callback in executor exception, maybe executor busy", e);
-                    }
-                } else {
-                    runInThisThread = true;
-                }
-
-                if (runInThisThread) {
-                    try {
-                        responseFuture.executeInvokeCallback();
-                    } catch (Throwable e) {
-                        PLOG.warn("executeInvokeCallback Exception", e);
-                    }
-                }
+                executeInvokeCallback(responseFuture);
             } else {
                 responseFuture.putResponse(cmd);
             }
@@ -236,6 +208,39 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
+    //execute callback in callback executor. If callback executor is null, run directly in current thread
+    private void executeInvokeCallback(final ResponseFuture responseFuture) {
+        boolean runInThisThread = false;
+        ExecutorService executor = this.getCallbackExecutor();
+        if (executor != null) {
+            try {
+                executor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            responseFuture.executeInvokeCallback();
+                        } catch (Throwable e) {
+                            PLOG.warn("execute callback in executor exception, and callback throw", e);
+                        }
+                    }
+                });
+            } catch (Exception e) {
+                runInThisThread = true;
+                PLOG.warn("execute callback in executor exception, maybe executor busy", e);
+            }
+        } else {
+            runInThisThread = true;
+        }
+
+        if (runInThisThread) {
+            try {
+                responseFuture.executeInvokeCallback();
+            } catch (Throwable e) {
+                PLOG.warn("executeInvokeCallback Exception", e);
+            }
+        }
+    }
+
     public abstract RPCHook getRPCHook();
 
     abstract public ExecutorService getCallbackExecutor();
@@ -257,7 +262,7 @@ public abstract class NettyRemotingAbstract {
 
         for (ResponseFuture rf : rfList) {
             try {
-                rf.executeInvokeCallback();
+                executeInvokeCallback(rf);
             } catch (Throwable e) {
                 PLOG.warn("scanResponseTable, operationComplete Exception", e);
             }
@@ -329,7 +334,7 @@ public abstract class NettyRemotingAbstract {
                         responseFuture.putResponse(null);
                         responseTable.remove(opaque);
                         try {
-                            responseFuture.executeInvokeCallback();
+                            executeInvokeCallback(responseFuture);
                         } catch (Throwable e) {
                             PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);
                         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1356e35f/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
index e4ff948..755d332 100644
--- a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
@@ -22,9 +22,15 @@ import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
 import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
 import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
 import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
+import com.alibaba.rocketmq.remoting.netty.ResponseFuture;
 import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 
 /**
 
@@ -51,6 +57,52 @@ public class NettyConnectionTest {
         System.out.println("-----------------------------------------------------------------");
     }
 
+
+    @Test
+    public void test_async_timeout() throws InterruptedException, RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException {
+        RemotingClient client = createRemotingClient();
+        final AtomicInteger ai = new AtomicInteger(0);
+        final CountDownLatch latch = new CountDownLatch(100);
+        for(int i=0;i<100;i++) {
+            try {
+                RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+                client.invokeAsync("localhost:8888", request, 5, new InvokeCallback() {//very easy to timeout
+                    @Override
+                    public void operationComplete(ResponseFuture responseFuture) {
+                        if (responseFuture.isTimeout()) {
+                            if(ai.getAndIncrement()==4) {
+                                try {
+                                    System.out.println("First try timeout,  blocking 10s" + Thread.currentThread().getName());
+                                    Thread.sleep(10 * 1000);
+                                } catch (InterruptedException e) {
+                                    e.printStackTrace();
+                                }
+                            }
+                            else{
+                                System.out.println("Timeout callback execute,very short."+Thread.currentThread().getName());
+                            }
+                        }
+                        else{
+                            System.out.println("Success."+Thread.currentThread().getName());
+                        }
+                        latch.countDown();
+
+                    }
+                });
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+
+
+        latch.await(1000, TimeUnit.MILLISECONDS);
+        Assert.assertEquals(1, latch.getCount());//only one should be blocked
+        client.shutdown();
+        System.out.println("-----------------------------------------------------------------");
+    }
+
     public static RemotingClient createRemotingClient() {
         NettyClientConfig config = new NettyClientConfig();
         config.setClientChannelMaxIdleTimeSeconds(15);


[02/10] incubator-rocketmq git commit: ROCKETMQ-3 Clean up and perfect the unit test with thanks to zander

Posted by vi...@apache.org.
ROCKETMQ-3 Clean up and perfect the unit test with thanks to zander


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/03880d77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/03880d77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/03880d77

Branch: refs/heads/spec
Commit: 03880d77a984403170802e9f32a568e57347e3ab
Parents: b5afe91
Author: Willem Jiang <wi...@gmail.com>
Authored: Mon Dec 26 10:38:39 2016 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Dec 26 10:38:39 2016 +0800

----------------------------------------------------------------------
 .../rocketmq/broker/api/SendMessageTest.java    | 32 +++++++++-----------
 .../broker/topic/TopicConfigManagerTest.java    | 12 ++++----
 2 files changed, 20 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03880d77/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
index d9babc2..cf97876 100644
--- a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
@@ -63,25 +63,21 @@ public class SendMessageTest extends BrokerTestHarness{
     }
 
     @Test
-    public void testSendSingle() throws Exception {
+    public void testSendSingle() throws Exception{
         Message msg = new Message(topic, "TAG1 TAG2", "100200300", "body".getBytes());
-        try {
-            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
-            requestHeader.setProducerGroup("abc");
-            requestHeader.setTopic(msg.getTopic());
-            requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
-            requestHeader.setDefaultTopicQueueNums(4);
-            requestHeader.setQueueId(0);
-            requestHeader.setSysFlag(0);
-            requestHeader.setBornTimestamp(System.currentTimeMillis());
-            requestHeader.setFlag(msg.getFlag());
-            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
+        SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+        requestHeader.setProducerGroup("abc");
+        requestHeader.setTopic(msg.getTopic());
+        requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
+        requestHeader.setDefaultTopicQueueNums(4);
+        requestHeader.setQueueId(0);
+        requestHeader.setSysFlag(0);
+        requestHeader.setBornTimestamp(System.currentTimeMillis());
+        requestHeader.setFlag(msg.getFlag());
+        requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
 
-            SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5,
-                    CommunicationMode.SYNC, new SendMessageContext(), null);
-            assertTrue(result.getSendStatus() == SendStatus.SEND_OK);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+        SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5,
+                CommunicationMode.SYNC, new SendMessageContext(), null);
+        assertEquals(result.getSendStatus(), SendStatus.SEND_OK);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03880d77/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
index 7a6503f..1c93b02 100644
--- a/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ b/rocketmq-broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -25,7 +25,7 @@ import com.alibaba.rocketmq.common.MixAll;
 import com.alibaba.rocketmq.common.TopicConfig;
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 
 /**
@@ -39,7 +39,7 @@ public class TopicConfigManagerTest extends BrokerTestHarness {
         for (int i = 0; i < 10; i++) {
             String topic = "UNITTEST-" + i;
             TopicConfig topicConfig = topicConfigManager.createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0);
-            assertTrue(topicConfig != null);
+            assertNotNull(topicConfig);
         }
         topicConfigManager.persist();
 
@@ -48,15 +48,15 @@ public class TopicConfigManagerTest extends BrokerTestHarness {
         for (int i = 0; i < 10; i++) {
             String topic = "UNITTEST-" + i;
             TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
-            assertTrue(topicConfig == null);
+            assertNull(topicConfig);
         }
         topicConfigManager.load();
         for (int i = 0; i < 10; i++) {
             String topic = "UNITTEST-" + i;
             TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
-            assertTrue(topicConfig != null);
-            assertTrue(topicConfig.getTopicSysFlag() == 0);
-            assertTrue(topicConfig.getReadQueueNums() == 4);
+            assertNotNull(topicConfig);
+            assertEquals(topicConfig.getTopicSysFlag(), 0);
+            assertEquals(topicConfig.getReadQueueNums(), 4);
         }
     }
 }


[10/10] incubator-rocketmq git commit: [ROCKETMQ-17] Develop a vendor-neutral open standard for distributed messaging: add producer sample https://issues.apache.org/jira/browse/ROCKETMQ-17

Posted by vi...@apache.org.
[ROCKETMQ-17] Develop a vendor-neutral open standard for distributed messaging: add producer sample
https://issues.apache.org/jira/browse/ROCKETMQ-17


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/13cba188
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/13cba188
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/13cba188

Branch: refs/heads/spec
Commit: 13cba18842604067e20d6c1048f83e614e4ac949
Parents: 3504967
Author: vintagewang <vi...@apache.org>
Authored: Wed Dec 28 14:06:14 2016 +0800
Committer: vintagewang <vi...@apache.org>
Committed: Wed Dec 28 14:06:14 2016 +0800

----------------------------------------------------------------------
 .../apache/openmessaging/MessagingEndPoint.java |  2 +
 .../openmessaging/MessagingEndPointManager.java |  4 +-
 .../internal/MessagingEndPointFactory.java      | 11 ++---
 .../messaging-user-level-samples/java/pom.xml   | 27 +++++++++++++
 .../apache/openmessaging/samples/Producer.java  | 42 ++++++++++++++++++++
 spec/code/pom.xml                               |  6 +++
 6 files changed, 85 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13cba188/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPoint.java
----------------------------------------------------------------------
diff --git a/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPoint.java b/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPoint.java
index f06d7e5..f90cb62 100644
--- a/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPoint.java
+++ b/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPoint.java
@@ -30,4 +30,6 @@ public interface MessagingEndPoint {
     PushConsumer createPushConsumer();
 
     PullConsumer createPullConsumer();
+
+    BytesMessage createBytesMessage(final String topic, final byte[] body);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13cba188/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPointManager.java
----------------------------------------------------------------------
diff --git a/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPointManager.java b/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPointManager.java
index 5635893..0ecd870 100644
--- a/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPointManager.java
+++ b/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/MessagingEndPointManager.java
@@ -28,11 +28,11 @@ import java.util.Properties;
  * @author vintagewang@apache.org
  */
 public class MessagingEndPointManager {
-    public static MessagingEndPoint getMessagingEndPoint(String url) throws Exception {
+    public static MessagingEndPoint getMessagingEndPoint(String url) {
         return getMessagingEndPoint(url, new Properties());
     }
 
-    public static MessagingEndPoint getMessagingEndPoint(String url, Properties properties) throws Exception {
+    public static MessagingEndPoint getMessagingEndPoint(String url, Properties properties) {
         Map<String, List<String>> driverUrl = URISpecParser.parseURI(url);
         if (null == driverUrl || driverUrl.size() == 0) {
             throw new IllegalArgumentException("driver url parsed result.size ==0");

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13cba188/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/internal/MessagingEndPointFactory.java
----------------------------------------------------------------------
diff --git a/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/internal/MessagingEndPointFactory.java b/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/internal/MessagingEndPointFactory.java
index 7d521ff..1dbc71d 100644
--- a/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/internal/MessagingEndPointFactory.java
+++ b/spec/code/messaging-user-level-api/java/src/main/java/org/apache/openmessaging/internal/MessagingEndPointFactory.java
@@ -19,7 +19,6 @@ package org.apache.openmessaging.internal;
 
 import org.apache.openmessaging.MessagingEndPoint;
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -28,13 +27,15 @@ import java.util.Properties;
  * @author vintagewang@apache.org
  */
 public class MessagingEndPointFactory {
-    public static MessagingEndPoint createMessagingEndPoint(Map<String, List<String>> url, Properties properties)
-            throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException,
-            InstantiationException, IllegalAccessException {
+    public static MessagingEndPoint createMessagingEndPoint(Map<String, List<String>> url, Properties properties) {
         List<String> driver = url.get(ServiceConstants.SPI_NAME);
         List<String> urls = url.get(ServiceConstants.URL_NAME);
         if (urls != null && urls.size() > 0)
             properties.put(ServiceConstants.URL, urls.get(0));
-        return MessagingEndPointAdapter.instantiateMessagingEndPoint(driver.get(0), properties);
+        try {
+            return MessagingEndPointAdapter.instantiateMessagingEndPoint(driver.get(0), properties);
+        } catch (Exception e) {
+            throw new RuntimeException("createMessagingEndPoint exception", e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13cba188/spec/code/messaging-user-level-samples/java/pom.xml
----------------------------------------------------------------------
diff --git a/spec/code/messaging-user-level-samples/java/pom.xml b/spec/code/messaging-user-level-samples/java/pom.xml
new file mode 100644
index 0000000..67f4151
--- /dev/null
+++ b/spec/code/messaging-user-level-samples/java/pom.xml
@@ -0,0 +1,27 @@
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache</groupId>
+        <artifactId>open-standard-all</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>messaging-user-level-samples</artifactId>
+    <name>messaging-user-level-samples ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>messaging-user-level-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13cba188/spec/code/messaging-user-level-samples/java/src/main/java/org/apache/openmessaging/samples/Producer.java
----------------------------------------------------------------------
diff --git a/spec/code/messaging-user-level-samples/java/src/main/java/org/apache/openmessaging/samples/Producer.java b/spec/code/messaging-user-level-samples/java/src/main/java/org/apache/openmessaging/samples/Producer.java
new file mode 100644
index 0000000..30a0d78
--- /dev/null
+++ b/spec/code/messaging-user-level-samples/java/src/main/java/org/apache/openmessaging/samples/Producer.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openmessaging.samples;
+
+
+import org.apache.openmessaging.MessagingEndPoint;
+import org.apache.openmessaging.MessagingEndPointManager;
+
+import java.nio.charset.Charset;
+
+public class Producer {
+    public static void main(String[] args) {
+        final MessagingEndPoint messagingEndPoint = MessagingEndPointManager.getMessagingEndPoint("openmessaging:rocketmq://localhost:10911/namespace");
+
+        final org.apache.openmessaging.Producer producer = messagingEndPoint.createProducer();
+
+        producer.start();
+
+        producer.send(messagingEndPoint.createBytesMessage("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                producer.shutdown();
+            }
+        }));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/13cba188/spec/code/pom.xml
----------------------------------------------------------------------
diff --git a/spec/code/pom.xml b/spec/code/pom.xml
index fe83fc5..097431b 100644
--- a/spec/code/pom.xml
+++ b/spec/code/pom.xml
@@ -11,6 +11,7 @@
 
     <modules>
         <module>messaging-user-level-api/java</module>
+        <module>messaging-user-level-samples/java</module>
         <module>messaging-wire-level-api</module>
     </modules>
 
@@ -86,6 +87,11 @@
         <dependencies>
             <dependency>
                 <groupId>${project.groupId}</groupId>
+                <artifactId>messaging-user-level-samples</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
                 <artifactId>messaging-user-level-api</artifactId>
                 <version>${project.version}</version>
             </dependency>


[05/10] incubator-rocketmq git commit: [ROCKETMQ-9] Clean up the code

Posted by vi...@apache.org.
[ROCKETMQ-9] Clean up the code


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/e5892e16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/e5892e16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/e5892e16

Branch: refs/heads/spec
Commit: e5892e164d469acf687c1cb3e598c6c44f9ef1c1
Parents: d1fa869
Author: Willem Jiang <wi...@gmail.com>
Authored: Tue Dec 27 13:33:43 2016 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Dec 27 13:33:43 2016 +0800

----------------------------------------------------------------------
 .../main/java/com/alibaba/rocketmq/store/MappedFileQueue.java  | 6 +++---
 .../java/com/alibaba/rocketmq/store/index/IndexService.java    | 4 ++--
 rocketmq-store/src/test/resources/logback-test.xml             | 1 +
 3 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e5892e16/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
index 8d9d3ab..0d15ece 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
@@ -484,10 +484,10 @@ public class MappedFileQueue {
                 try {
                     return this.mappedFiles.get(index);
                 } catch (Exception e) {
-                    if (returnFirstOnNotFound)
+                    if (returnFirstOnNotFound) {
                         return mappedFile;
-
-                    LOG_ERROR.warn("findMappedFileByOffset failure. {}", UtilAll.currentStackTrace());
+                    }
+                    LOG_ERROR.warn("findMappedFileByOffset failure. ", e);
                 }
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e5892e16/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
index fded747..f4f27bc 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
@@ -279,7 +279,7 @@ public class IndexService {
      *
      * @return {@link IndexFile} or null on failure.
      */
-    private IndexFile retryGetAndCreateIndexFile() {
+    public IndexFile retryGetAndCreateIndexFile() {
         IndexFile indexFile = null;
 
         for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
@@ -288,7 +288,7 @@ public class IndexService {
                 break;
 
             try {
-                log.error("Tried to create index file " + times + " times");
+                log.info("Tried to create index file " + times + " times");
                 Thread.sleep(1000);
             } catch (InterruptedException e) {
                 e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/e5892e16/rocketmq-store/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/resources/logback-test.xml b/rocketmq-store/src/test/resources/logback-test.xml
index 11d429d..6754c0a 100644
--- a/rocketmq-store/src/test/resources/logback-test.xml
+++ b/rocketmq-store/src/test/resources/logback-test.xml
@@ -20,6 +20,7 @@
   <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
     <encoder>
       <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n</pattern>
+      <charset class="java.nio.charset.Charset">UTF-8</charset>
     </encoder>
   </appender>
 


[07/10] incubator-rocketmq git commit: Allow setting base factor for commercial data.

Posted by vi...@apache.org.
Allow setting base factor for commercial data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/0c022e05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/0c022e05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/0c022e05

Branch: refs/heads/spec
Commit: 0c022e05af86fa53c71bcb92b93a8a2b6fb82908
Parents: e5892e1
Author: yukon <yu...@apache.org>
Authored: Tue Dec 27 16:38:25 2016 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Dec 27 16:38:25 2016 +0800

----------------------------------------------------------------------
 .../rocketmq/broker/processor/PullMessageProcessor.java     | 4 +++-
 .../rocketmq/broker/processor/SendMessageProcessor.java     | 3 ++-
 .../main/java/com/alibaba/rocketmq/common/BrokerConfig.java | 9 +++++++++
 3 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
index 0152b93..1257f18 100644
--- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
@@ -299,9 +299,11 @@ public class PullMessageProcessor implements NettyRequestProcessor {
 
                 switch (response.getCode()) {
                     case ResponseCode.SUCCESS:
+                        int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
+                        int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
 
                         context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
-                        context.setCommercialRcvTimes(getMessageResult.getMsgCount4Commercial());
+                        context.setCommercialRcvTimes(incValue);
                         context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
                         context.setCommercialOwner(owner);
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
index 414b3f4..a375285 100644
--- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
@@ -428,8 +428,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
                     sendMessageContext.setQueueId(responseHeader.getQueueId());
                     sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
 
+                    int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
                     int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
-                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
 
                     sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
                     sendMessageContext.setCommercialSendTimes(incValue);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0c022e05/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
index 6eae0a7..ba80a3f 100644
--- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
@@ -85,6 +85,7 @@ public class BrokerConfig {
     private int commercialTimerCount = 1;
     private int commercialTransCount = 1;
     private int commercialBigCount = 1;
+    private int commercialBaseCount = 1;
 
     private boolean transferMsgByHeap = true;
     private int maxDelayTime = 40;
@@ -537,4 +538,12 @@ public class BrokerConfig {
     public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) {
         this.consumerManageThreadPoolNums = consumerManageThreadPoolNums;
     }
+
+    public int getCommercialBaseCount() {
+        return commercialBaseCount;
+    }
+
+    public void setCommercialBaseCount(int commercialBaseCount) {
+        this.commercialBaseCount = commercialBaseCount;
+    }
 }


[03/10] incubator-rocketmq git commit: [ROCKETMQ-13] Wrong log level for AcceptSocketService termination.

Posted by vi...@apache.org.
[ROCKETMQ-13] Wrong log level for AcceptSocketService termination.

Additionally, added code comments and did a cleanup.

JIRA issue: https://issues.apache.org/jira/browse/ROCKETMQ-13


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/fed09763
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/fed09763
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/fed09763

Branch: refs/heads/spec
Commit: fed09763bccb73696d1953937b8b1eb1ce88b131
Parents: 626990c
Author: shtykh_roman <rs...@yahoo.com>
Authored: Mon Dec 26 12:17:53 2016 +0900
Committer: Willem Jiang <wi...@gmail.com>
Committed: Mon Dec 26 15:12:41 2016 +0800

----------------------------------------------------------------------
 .../alibaba/rocketmq/store/ha/HAService.java    | 26 +++++++++++++-------
 1 file changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/fed09763/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
index 075252c..2cf695c 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
@@ -46,7 +46,7 @@ public class HAService {
 
     private final AtomicInteger connectionCount = new AtomicInteger(0);
 
-    private final List<HAConnection> connectionList = new LinkedList<HAConnection>();
+    private final List<HAConnection> connectionList = new LinkedList<>();
 
     private final AcceptSocketService acceptSocketService;
 
@@ -170,17 +170,22 @@ public class HAService {
         return push2SlaveMaxOffset;
     }
 
+    /**
+     * Listens to slave connections to create {@link HAConnection}.
+     */
     class AcceptSocketService extends ServiceThread {
         private ServerSocketChannel serverSocketChannel;
         private Selector selector;
         private final SocketAddress socketAddressListen;
 
-
         public AcceptSocketService(final int port) {
             this.socketAddressListen = new InetSocketAddress(port);
         }
 
-
+        /**
+         * Starts listening to slave connections.
+         * @throws Exception If fails.
+         */
         public void beginAccept() throws Exception {
             this.serverSocketChannel = ServerSocketChannel.open();
             this.selector = RemotingUtil.openSelector();
@@ -190,6 +195,7 @@ public class HAService {
             this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
         }
 
+        /** {@inheritDoc} */
         @Override
         public void shutdown(final boolean interrupt) {
             super.shutdown(interrupt);
@@ -202,6 +208,7 @@ public class HAService {
             }
         }
 
+        /** {@inheritDoc} */
         @Override
         public void run() {
             log.info(this.getServiceName() + " service started");
@@ -210,10 +217,12 @@ public class HAService {
                 try {
                     this.selector.select(1000);
                     Set<SelectionKey> selected = this.selector.selectedKeys();
+
                     if (selected != null) {
                         for (SelectionKey k : selected) {
                             if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                 SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
+
                                 if (sc != null) {
                                     HAService.log.info("HAService receive new connection, "
                                             + sc.socket().getRemoteSocketAddress());
@@ -234,16 +243,15 @@ public class HAService {
 
                         selected.clear();
                     }
-
                 } catch (Exception e) {
                     log.error(this.getServiceName() + " service has exception.", e);
                 }
             }
 
-            log.error(this.getServiceName() + " service end");
+            log.info(this.getServiceName() + " service end");
         }
 
-
+        /** {@inheritDoc} */
         @Override
         public String getServiceName() {
             return AcceptSocketService.class.getSimpleName();
@@ -256,8 +264,8 @@ public class HAService {
     class GroupTransferService extends ServiceThread {
 
         private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
-        private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
-        private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
+        private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<>();
+        private volatile List<GroupCommitRequest> requestsRead = new ArrayList<>();
 
 
         public void putRequest(final GroupCommitRequest request) {
@@ -333,7 +341,7 @@ public class HAService {
 
     class HAClient extends ServiceThread {
         private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
-        private final AtomicReference<String> masterAddress = new AtomicReference<String>();
+        private final AtomicReference<String> masterAddress = new AtomicReference<>();
         private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
         private SocketChannel socketChannel;
         private Selector selector;


[09/10] incubator-rocketmq git commit: Merge branch 'master' into spec

Posted by vi...@apache.org.
Merge branch 'master' into spec


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/35049678
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/35049678
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/35049678

Branch: refs/heads/spec
Commit: 35049678309969d57bcc5c7c116076ee4698bd72
Parents: ccdbdf5 1356e35
Author: vintagewang <vi...@apache.org>
Authored: Wed Dec 28 13:39:58 2016 +0800
Committer: vintagewang <vi...@apache.org>
Committed: Wed Dec 28 13:39:58 2016 +0800

----------------------------------------------------------------------
 .../broker/processor/PullMessageProcessor.java  |  4 +-
 .../broker/processor/SendMessageProcessor.java  |  3 +-
 .../rocketmq/broker/BrokerControllerTest.java   | 10 ++-
 .../rocketmq/broker/api/SendMessageTest.java    | 32 ++++------
 .../broker/topic/TopicConfigManagerTest.java    | 12 ++--
 .../alibaba/rocketmq/common/BrokerConfig.java   |  9 +++
 .../remoting/netty/NettyRemotingAbstract.java   | 67 +++++++++++---------
 .../rocketmq/remoting/NettyConnectionTest.java  | 52 +++++++++++++++
 .../store/AllocateMappedFileService.java        |  2 +-
 .../alibaba/rocketmq/store/MappedFileQueue.java | 22 +++++--
 .../alibaba/rocketmq/store/ha/HAService.java    | 31 +++++----
 .../alibaba/rocketmq/store/index/IndexFile.java |  5 +-
 .../rocketmq/store/index/IndexService.java      | 29 +++++----
 .../rocketmq/store/MappedFileQueueTest.java     | 17 ++++-
 .../rocketmq/store/index/IndexFileTest.java     | 18 ++++--
 .../src/test/resources/logback-test.xml         |  3 +-
 16 files changed, 212 insertions(+), 104 deletions(-)
----------------------------------------------------------------------



[04/10] incubator-rocketmq git commit: [ROCKETMQ-9] Errors in rocketmq-store module.

Posted by vi...@apache.org.
[ROCKETMQ-9] Errors in rocketmq-store module.

JIRA issue: https://issues.apache.org/jira/browse/ROCKETMQ-9


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/d1fa8694
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/d1fa8694
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/d1fa8694

Branch: refs/heads/spec
Commit: d1fa8694ff82b429dcd811156edf7ea8a702237e
Parents: fed0976
Author: shtykh_roman <rs...@yahoo.com>
Authored: Mon Dec 26 18:32:00 2016 +0900
Committer: Willem Jiang <wi...@gmail.com>
Committed: Tue Dec 27 09:50:23 2016 +0800

----------------------------------------------------------------------
 .../store/AllocateMappedFileService.java        |  2 +-
 .../alibaba/rocketmq/store/MappedFileQueue.java | 26 ++++++++++------
 .../alibaba/rocketmq/store/index/IndexFile.java |  5 ++--
 .../rocketmq/store/index/IndexService.java      | 31 +++++++++++---------
 .../rocketmq/store/MappedFileQueueTest.java     | 17 +++++++++--
 .../rocketmq/store/index/IndexFileTest.java     | 18 +++++++-----
 .../src/test/resources/logback-test.xml         |  2 +-
 7 files changed, 63 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
index 40eee7a..06113c8 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
@@ -213,7 +213,7 @@ public class AllocateMappedFileService extends ServiceThread {
                 isSuccess = true;
             }
         } catch (InterruptedException e) {
-            log.warn(this.getServiceName() + " service has exception, maybe by shutdown");
+            log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
             this.hasException = true;
             return false;
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
index 2b006c0..8d9d3ab 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
@@ -459,27 +459,35 @@ public class MappedFileQueue {
         return result;
     }
 
-
+    /**
+     * Finds a mapped file by offset.
+     *
+     * @param offset Offset.
+     * @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
+     * @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
+     */
     public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
         try {
             MappedFile mappedFile = this.getFirstMappedFile();
             if (mappedFile != null) {
                 int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
                 if (index < 0 || index >= this.mappedFiles.size()) {
-                    LOG_ERROR.warn("findMappedFileByOffset offset not matched, request Offset: {}, index: {}, mappedFileSize: {}, mappedFiles count: {}, StackTrace: {}",
-                            offset,
-                            index,
-                            this.mappedFileSize,
-                            this.mappedFiles.size(),
-                            UtilAll.currentStackTrace());
+                    LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " +
+                        "mappedFileSize: {}, mappedFiles count: {}",
+                        mappedFile,
+                        offset,
+                        index,
+                        this.mappedFileSize,
+                        this.mappedFiles.size());
                 }
 
                 try {
                     return this.mappedFiles.get(index);
                 } catch (Exception e) {
-                    if (returnFirstOnNotFound) {
+                    if (returnFirstOnNotFound)
                         return mappedFile;
-                    }
+
+                    LOG_ERROR.warn("findMappedFileByOffset failure. {}", UtilAll.currentStackTrace());
                 }
             }
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
index f353320..befa5f9 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
@@ -94,7 +94,6 @@ public class IndexFile {
         return this.indexHeader.getIndexCount() >= this.indexNum;
     }
 
-
     public boolean destroy(final long intervalForcibly) {
         return this.mappedFile.destroy(intervalForcibly);
     }
@@ -167,8 +166,8 @@ public class IndexFile {
                 }
             }
         } else {
-            log.warn("putKey index count " + this.indexHeader.getIndexCount() + " index max num "
-                    + this.indexNum);
+            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+                + "; index max num = " + this.indexNum);
         }
 
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
index f275f80..fded747 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
@@ -49,6 +49,9 @@ public class IndexService {
     private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 
+    /** Maximum times to attempt index file creation. */
+    private static final int MAX_TRY_IDX_CREATE = 3;
+
 
     public IndexService(final DefaultMessageStore store) {
         this.defaultMessageStore = store;
@@ -257,44 +260,44 @@ public class IndexService {
 
 
     private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
-        for (boolean ok =
-             indexFile.putKey(idxKey, msg.getCommitLogOffset(),
-                     msg.getStoreTimestamp()); !ok; ) {
-            log.warn("index file full, so create another one, " + indexFile.getFileName());
+        for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
+            log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
+
             indexFile = retryGetAndCreateIndexFile();
             if (null == indexFile) {
                 return null;
             }
 
-            ok =
-                    indexFile.putKey(idxKey, msg.getCommitLogOffset(),
-                            msg.getStoreTimestamp());
+            ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
         }
+
         return indexFile;
     }
 
-
-    public IndexFile retryGetAndCreateIndexFile() {
+    /**
+     * Retries to get or create index file.
+     *
+     * @return {@link IndexFile} or null on failure.
+     */
+    private IndexFile retryGetAndCreateIndexFile() {
         IndexFile indexFile = null;
 
-
-        for (int times = 0; null == indexFile && times < 3; times++) {
+        for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
             indexFile = this.getAndCreateLastIndexFile();
             if (null != indexFile)
                 break;
 
             try {
-                log.error("try to create index file, " + times + " times");
+                log.error("Tried to create index file " + times + " times");
                 Thread.sleep(1000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
 
-
         if (null == indexFile) {
             this.defaultMessageStore.getAccessRights().makeIndexFileError();
-            log.error("mark index file can not build flag");
+            log.error("Mark index file cannot build flag");
         }
 
         return indexFile;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
index 89b37be..700f1c6 100644
--- a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
@@ -52,6 +52,7 @@ public class MappedFileQueueTest {
     @Test
     public void test_getLastMapedFile() {
         final String fixedMsg = "0123456789abcdef";
+
         logger.debug("================================================================");
         MappedFileQueue mappedFileQueue =
                 new MappedFileQueue("target/unit_test_store/a/", 1024, null);
@@ -59,6 +60,7 @@ public class MappedFileQueueTest {
         for (int i = 0; i < 1024; i++) {
             MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
             assertTrue(mappedFile != null);
+
             boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
             if (!result) {
                 logger.debug("appendMessage " + i);
@@ -74,7 +76,9 @@ public class MappedFileQueueTest {
 
     @Test
     public void test_findMapedFileByOffset() {
+        // four-byte string.
         final String fixedMsg = "abcd";
+
         logger.debug("================================================================");
         MappedFileQueue mappedFileQueue =
                 new MappedFileQueue("target/unit_test_store/b/", 1024, null);
@@ -82,11 +86,13 @@ public class MappedFileQueueTest {
         for (int i = 0; i < 1024; i++) {
             MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
             assertTrue(mappedFile != null);
+
             boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
-            // logger.debug("appendMessage " + bytes);
             assertTrue(result);
         }
 
+        assertEquals(fixedMsg.getBytes().length * 1024, mappedFileQueue.getMappedMemorySize());
+
         MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0);
         assertTrue(mappedFile != null);
         assertEquals(mappedFile.getFileFromOffset(), 0);
@@ -110,7 +116,8 @@ public class MappedFileQueueTest {
         mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100);
         assertTrue(mappedFile != null);
         assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
-        
+
+        // over mapped memory size.
         mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4);
         assertTrue(mappedFile == null);
 
@@ -125,6 +132,7 @@ public class MappedFileQueueTest {
     @Test
     public void test_commit() {
         final String fixedMsg = "0123456789abcdef";
+
         logger.debug("================================================================");
         MappedFileQueue mappedFileQueue =
                 new MappedFileQueue("target/unit_test_store/c/", 1024, null);
@@ -132,6 +140,7 @@ public class MappedFileQueueTest {
         for (int i = 0; i < 1024; i++) {
             MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
             assertTrue(mappedFile != null);
+
             boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
             assertTrue(result);
         }
@@ -168,6 +177,7 @@ public class MappedFileQueueTest {
     @Test
     public void test_getMapedMemorySize() {
         final String fixedMsg = "abcd";
+
         logger.debug("================================================================");
         MappedFileQueue mappedFileQueue =
                 new MappedFileQueue("target/unit_test_store/d/", 1024, null);
@@ -175,14 +185,15 @@ public class MappedFileQueueTest {
         for (int i = 0; i < 1024; i++) {
             MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
             assertTrue(mappedFile != null);
+
             boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
             assertTrue(result);
         }
 
         assertEquals(fixedMsg.length() * 1024, mappedFileQueue.getMappedMemorySize());
+
         mappedFileQueue.shutdown(1000);
         mappedFileQueue.destroy();
         logger.debug("MappedFileQueue.getMappedMemorySize() OK");
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
index f6bfc0a..9e446f7 100644
--- a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
@@ -31,17 +31,18 @@ import static org.junit.Assert.assertTrue;
 
 
 public class IndexFileTest {
-    private static final int hashSlotNum = 100;
-    private static final int indexNum = 400;
+    private static final int HASH_SLOT_NUM = 100;
+    private static final int INDEX_NUM = 400;
 
     @Test
     public void test_put_index() throws Exception {
-        IndexFile indexFile = new IndexFile("100", hashSlotNum, indexNum, 0, 0);
-        for (long i = 0; i < (indexNum - 1); i++) {
+        IndexFile indexFile = new IndexFile("100", HASH_SLOT_NUM, INDEX_NUM, 0, 0);
+        for (long i = 0; i < (INDEX_NUM - 1); i++) {
             boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
             assertTrue(putResult);
         }
-    
+
+        // put over index file capacity.
         boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
         assertFalse(putResult);
     
@@ -51,12 +52,14 @@ public class IndexFileTest {
 
     @Test
     public void test_put_get_index() throws Exception {
-        IndexFile indexFile = new IndexFile("200", hashSlotNum, indexNum, 0, 0);
+        IndexFile indexFile = new IndexFile("200", HASH_SLOT_NUM, INDEX_NUM, 0, 0);
     
-        for (long i = 0; i < (indexNum - 1); i++) {
+        for (long i = 0; i < (INDEX_NUM - 1); i++) {
             boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
             assertTrue(putResult);
         }
+
+        // put over index file capacity.
         boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
         assertFalse(putResult);
     
@@ -64,6 +67,7 @@ public class IndexFileTest {
         indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true);
         assertFalse(phyOffsets.isEmpty());
         assertEquals(1, phyOffsets.size());
+
         indexFile.destroy(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/d1fa8694/rocketmq-store/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/resources/logback-test.xml b/rocketmq-store/src/test/resources/logback-test.xml
index acdfa10..11d429d 100644
--- a/rocketmq-store/src/test/resources/logback-test.xml
+++ b/rocketmq-store/src/test/resources/logback-test.xml
@@ -27,7 +27,7 @@
     <appender-ref ref="STDOUT" />
   </logger>
 
-  <root level="WARN">
+  <root level="ERROR">
     <appender-ref ref="STDOUT" />
   </root>