You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2018/05/23 06:21:33 UTC

[rocketmq] branch develop updated (4a4a821 -> c264de9)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git.


    omit 4a4a821  Format output
    omit 7af7d0f  Show client IP only when required
    omit 2a715e5  Tag language of clients initialized through OMS as 'OMS'
    omit 07b5858  Use uber jar for netty-tcnative
     new 12a51c4  Use uber jar for netty-tcnative
     new 9c0e536  Tag language of clients initialized through OMS as 'OMS'
     new 0499586  Show client IP only when required
     new c264de9  Format output

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (4a4a821)
            \
             N -- N -- N   refs/heads/develop (c264de9)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:

-- 
To stop receiving notification emails like this one, please contact
lizhanhui@apache.org.

[rocketmq] 03/04: Show client IP only when required

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 0499586eb0c78c35099dfe9842d691ed27e791c2
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue May 22 20:02:41 2018 +0800

    Show client IP only when required
---
 .../consumer/ConsumerProgressSubCommand.java        | 21 +++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)

diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index 67a9197..7529601 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -64,6 +64,10 @@ public class ConsumerProgressSubCommand implements SubCommand {
         opt.setRequired(false);
         options.addOption(opt);
 
+        Option optionShowClientIP = new Option("s", "showClientIP", true, "Show Client IP per Queue");
+        optionShowClientIP.setRequired(false);
+        options.addOption(optionShowClientIP);
+
         return options;
     }
 
@@ -92,13 +96,22 @@ public class ConsumerProgressSubCommand implements SubCommand {
 
         try {
             defaultMQAdminExt.start();
+
+            boolean showClientIP = commandLine.hasOption('s')
+                && "true".equalsIgnoreCase(commandLine.getOptionValue('s'));
+
             if (commandLine.hasOption('g')) {
                 String consumerGroup = commandLine.getOptionValue('g').trim();
                 ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup);
                 List<MessageQueue> mqList = new LinkedList<MessageQueue>();
                 mqList.addAll(consumeStats.getOffsetTable().keySet());
                 Collections.sort(mqList);
-                Map<MessageQueue, String> messageQueueAllocationResult = getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup);
+
+                Map<MessageQueue, String> messageQueueAllocationResult = null;
+                if (showClientIP) {
+                    messageQueueAllocationResult = getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup);
+                }
+
                 System.out.printf("%-32s  %-32s  %-4s  %-20s  %-20s  %-20s %-20s  %s%n",
                     "#Topic",
                     "#Broker Name",
@@ -120,7 +133,11 @@ public class ConsumerProgressSubCommand implements SubCommand {
                     } catch (Exception e) {
                     }
 
-                    String clientIP = messageQueueAllocationResult.get(mq);
+                    String clientIP = null;
+                    if (showClientIP) {
+                        clientIP = messageQueueAllocationResult.get(mq);
+                    }
+
                     System.out.printf("%-32s  %-32s  %-4d  %-20d  %-20d  %-20s %-20d  %s%n",
                         UtilAll.frontStringAtLeast(mq.getTopic(), 32),
                         UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),

-- 
To stop receiving notification emails like this one, please contact
lizhanhui@apache.org.

[rocketmq] 01/04: Use uber jar for netty-tcnative

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 12a51c41942ec64046b7efe982e90d6fbadfaaaf
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu May 17 13:23:42 2018 +0800

    Use uber jar for netty-tcnative
---
 pom.xml          |  5 -----
 remoting/pom.xml | 15 ++-------------
 2 files changed, 2 insertions(+), 18 deletions(-)

diff --git a/pom.xml b/pom.xml
index d26e78f..1f9ce56 100644
--- a/pom.xml
+++ b/pom.xml
@@ -535,11 +535,6 @@
                 <version>${project.version}</version>
             </dependency>
             <dependency>
-                <groupId>${project.groupId}</groupId>
-                <artifactId>rocketmq-logging</artifactId>
-                <version>${project.version}</version>
-            </dependency>
-            <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-api</artifactId>
                 <version>1.7.7</version>
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 21d9c8d..5558913 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -47,19 +47,8 @@
         </dependency>
         <dependency>
             <groupId>io.netty</groupId>
-            <artifactId>netty-tcnative</artifactId>
-            <version>1.1.33.Fork22</version>
-            <classifier>${os.detected.classifier}</classifier>
+            <artifactId>netty-tcnative-boringssl-static</artifactId>
+            <version>1.1.33.Fork26</version>
         </dependency>
     </dependencies>
-
-    <build>
-        <extensions>
-            <extension>
-                <groupId>kr.motd.maven</groupId>
-                <artifactId>os-maven-plugin</artifactId>
-                <version>1.4.0.Final</version>
-            </extension>
-        </extensions>
-    </build>
 </project>

-- 
To stop receiving notification emails like this one, please contact
lizhanhui@apache.org.

[rocketmq] 04/04: Format output

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit c264de9c9345c6526a0f1f16088ffaf0bbefaf12
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue May 22 20:12:00 2018 +0800

    Format output
---
 .../tools/command/consumer/ConsumerProgressSubCommand.java        | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index 7529601..a1b3c1a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -129,7 +129,11 @@ public class ConsumerProgressSubCommand implements SubCommand {
                     diffTotal += diff;
                     String lastTime = "";
                     try {
-                        lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
+                        if (offsetWrapper.getLastTimestamp() == 0) {
+                            lastTime = "N/A";
+                        } else {
+                            lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
+                        }
                     } catch (Exception e) {
                     }
 
@@ -144,7 +148,7 @@ public class ConsumerProgressSubCommand implements SubCommand {
                         mq.getQueueId(),
                         offsetWrapper.getBrokerOffset(),
                         offsetWrapper.getConsumerOffset(),
-                        null != clientIP ? clientIP : "NA",
+                        null != clientIP ? clientIP : "N/A",
                         diff,
                         lastTime
                     );

-- 
To stop receiving notification emails like this one, please contact
lizhanhui@apache.org.

[rocketmq] 02/04: Tag language of clients initialized through OMS as 'OMS'

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 9c0e5360e109b2a5c4c86ed7053a59f868b078ee
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon May 21 11:30:12 2018 +0800

    Tag language of clients initialized through OMS as 'OMS'
---
 .../java/org/apache/rocketmq/client/ClientConfig.java   | 15 ++++++++++++++-
 .../apache/rocketmq/client/impl/MQClientAPIImpl.java    | 17 ++++++-----------
 .../rocketmq/consumer/PullConsumerImpl.java             |  5 ++++-
 .../rocketmq/consumer/PushConsumerImpl.java             |  2 ++
 .../rocketmq/producer/AbstractOMSProducer.java          |  4 +++-
 .../apache/rocketmq/remoting/protocol/LanguageCode.java |  3 ++-
 6 files changed, 31 insertions(+), 15 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index a9eabfe..d798164 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -20,6 +20,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 /**
  * Client Common configuration
@@ -48,6 +49,8 @@ public class ClientConfig {
 
     private boolean useTLS = TlsSystemConfig.tlsEnable;
 
+    private LanguageCode language = LanguageCode.JAVA;
+
     public String buildMQClientId() {
         StringBuilder sb = new StringBuilder();
         sb.append(this.getClientIP());
@@ -96,6 +99,7 @@ public class ClientConfig {
         this.unitName = cc.unitName;
         this.vipChannelEnabled = cc.vipChannelEnabled;
         this.useTLS = cc.useTLS;
+        this.language = cc.language;
     }
 
     public ClientConfig cloneClientConfig() {
@@ -111,6 +115,7 @@ public class ClientConfig {
         cc.unitName = unitName;
         cc.vipChannelEnabled = vipChannelEnabled;
         cc.useTLS = useTLS;
+        cc.language = language;
         return cc;
     }
 
@@ -186,12 +191,20 @@ public class ClientConfig {
         this.useTLS = useTLS;
     }
 
+    public LanguageCode getLanguage() {
+        return language;
+    }
+
+    public void setLanguage(LanguageCode language) {
+        this.language = language;
+    }
+
     @Override
     public String toString() {
         return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
             + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
             + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
             + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
-            + vipChannelEnabled + ", useTLS=" + useTLS + "]";
+            + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + "]";
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index d4ed1ec..ade6990 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.client.impl;
 
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -49,7 +48,6 @@ import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
@@ -137,6 +135,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
@@ -156,7 +155,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 public class MQClientAPIImpl {
 
     private final static InternalLogger log = ClientLogger.getLog();
-    public static boolean sendSmartMsg =
+    private static boolean sendSmartMsg =
         Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
 
     static {
@@ -217,13 +216,9 @@ public class MQClientAPIImpl {
     }
 
     public void updateNameServerAddressList(final String addrs) {
-        List<String> lst = new ArrayList<String>();
         String[] addrArray = addrs.split(";");
-        for (String addr : addrArray) {
-            lst.add(addr);
-        }
-
-        this.remotingClient.updateNameServerAddressList(lst);
+        List<String> list = Arrays.asList(addrArray);
+        this.remotingClient.updateNameServerAddressList(list);
     }
 
     public void start() {
@@ -857,7 +852,7 @@ public class MQClientAPIImpl {
         final long timeoutMillis
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
-
+        request.setLanguage(clientConfig.getLanguage());
         request.setBody(heartbeatData.encode());
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
         assert response != null;
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index c11da58..d673510 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 public class PullConsumerImpl implements PullConsumer {
     private final DefaultMQPullConsumer rocketmqPullConsumer;
@@ -46,7 +47,7 @@ public class PullConsumerImpl implements PullConsumer {
     private final LocalMessageCache localMessageCache;
     private final ClientConfig clientConfig;
 
-    final static InternalLogger log = ClientLogger.getLog();
+    private final static InternalLogger log = ClientLogger.getLog();
 
     public PullConsumerImpl(final KeyValue properties) {
         this.properties = properties;
@@ -77,6 +78,8 @@ public class PullConsumerImpl implements PullConsumer {
         this.rocketmqPullConsumer.setInstanceName(consumerId);
         properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
 
+        this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS);
+
         this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
     }
 
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index 46f6775..d5d394a 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 public class PushConsumerImpl implements PushConsumer {
     private final DefaultMQPushConsumer rocketmqPushConsumer;
@@ -73,6 +74,7 @@ public class PushConsumerImpl implements PushConsumer {
         String consumerId = OMSUtil.buildInstanceName();
         this.rocketmqPushConsumer.setInstanceName(consumerId);
         properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
+        this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS);
 
         this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
     }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index e40e2d4..53fc0f9 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
 
@@ -45,7 +46,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
     final KeyValue properties;
     final DefaultMQProducer rocketmqProducer;
     private boolean started = false;
-    final ClientConfig clientConfig;
+    private final ClientConfig clientConfig;
 
     AbstractOMSProducer(final KeyValue properties) {
         this.properties = properties;
@@ -67,6 +68,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
         this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout());
         this.rocketmqProducer.setInstanceName(producerId);
         this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
+        this.rocketmqProducer.setLanguage(LanguageCode.OMS);
         properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId);
     }
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
index 17ce919..4382af3 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
@@ -28,7 +28,8 @@ public enum LanguageCode {
     OTHER((byte) 7),
     HTTP((byte) 8),
     GO((byte) 9),
-    PHP((byte) 10);
+    PHP((byte) 10),
+    OMS((byte) 11);
 
     private byte code;
 

-- 
To stop receiving notification emails like this one, please contact
lizhanhui@apache.org.