You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2018/05/23 06:21:35 UTC

[rocketmq] 02/04: Tag language of clients initialized through OMS as 'OMS'

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 9c0e5360e109b2a5c4c86ed7053a59f868b078ee
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon May 21 11:30:12 2018 +0800

    Tag language of clients initialized through OMS as 'OMS'
---
 .../java/org/apache/rocketmq/client/ClientConfig.java   | 15 ++++++++++++++-
 .../apache/rocketmq/client/impl/MQClientAPIImpl.java    | 17 ++++++-----------
 .../rocketmq/consumer/PullConsumerImpl.java             |  5 ++++-
 .../rocketmq/consumer/PushConsumerImpl.java             |  2 ++
 .../rocketmq/producer/AbstractOMSProducer.java          |  4 +++-
 .../apache/rocketmq/remoting/protocol/LanguageCode.java |  3 ++-
 6 files changed, 31 insertions(+), 15 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index a9eabfe..d798164 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -20,6 +20,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 /**
  * Client Common configuration
@@ -48,6 +49,8 @@ public class ClientConfig {
 
     private boolean useTLS = TlsSystemConfig.tlsEnable;
 
+    private LanguageCode language = LanguageCode.JAVA;
+
     public String buildMQClientId() {
         StringBuilder sb = new StringBuilder();
         sb.append(this.getClientIP());
@@ -96,6 +99,7 @@ public class ClientConfig {
         this.unitName = cc.unitName;
         this.vipChannelEnabled = cc.vipChannelEnabled;
         this.useTLS = cc.useTLS;
+        this.language = cc.language;
     }
 
     public ClientConfig cloneClientConfig() {
@@ -111,6 +115,7 @@ public class ClientConfig {
         cc.unitName = unitName;
         cc.vipChannelEnabled = vipChannelEnabled;
         cc.useTLS = useTLS;
+        cc.language = language;
         return cc;
     }
 
@@ -186,12 +191,20 @@ public class ClientConfig {
         this.useTLS = useTLS;
     }
 
+    public LanguageCode getLanguage() {
+        return language;
+    }
+
+    public void setLanguage(LanguageCode language) {
+        this.language = language;
+    }
+
     @Override
     public String toString() {
         return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
             + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
             + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
             + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
-            + vipChannelEnabled + ", useTLS=" + useTLS + "]";
+            + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + "]";
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index d4ed1ec..ade6990 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.client.impl;
 
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -49,7 +48,6 @@ import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
@@ -137,6 +135,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
@@ -156,7 +155,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 public class MQClientAPIImpl {
 
     private final static InternalLogger log = ClientLogger.getLog();
-    public static boolean sendSmartMsg =
+    private static boolean sendSmartMsg =
         Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
 
     static {
@@ -217,13 +216,9 @@ public class MQClientAPIImpl {
     }
 
     public void updateNameServerAddressList(final String addrs) {
-        List<String> lst = new ArrayList<String>();
         String[] addrArray = addrs.split(";");
-        for (String addr : addrArray) {
-            lst.add(addr);
-        }
-
-        this.remotingClient.updateNameServerAddressList(lst);
+        List<String> list = Arrays.asList(addrArray);
+        this.remotingClient.updateNameServerAddressList(list);
     }
 
     public void start() {
@@ -857,7 +852,7 @@ public class MQClientAPIImpl {
         final long timeoutMillis
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
-
+        request.setLanguage(clientConfig.getLanguage());
         request.setBody(heartbeatData.encode());
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
         assert response != null;
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index c11da58..d673510 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 public class PullConsumerImpl implements PullConsumer {
     private final DefaultMQPullConsumer rocketmqPullConsumer;
@@ -46,7 +47,7 @@ public class PullConsumerImpl implements PullConsumer {
     private final LocalMessageCache localMessageCache;
     private final ClientConfig clientConfig;
 
-    final static InternalLogger log = ClientLogger.getLog();
+    private final static InternalLogger log = ClientLogger.getLog();
 
     public PullConsumerImpl(final KeyValue properties) {
         this.properties = properties;
@@ -77,6 +78,8 @@ public class PullConsumerImpl implements PullConsumer {
         this.rocketmqPullConsumer.setInstanceName(consumerId);
         properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
 
+        this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS);
+
         this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
     }
 
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index 46f6775..d5d394a 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 public class PushConsumerImpl implements PushConsumer {
     private final DefaultMQPushConsumer rocketmqPushConsumer;
@@ -73,6 +74,7 @@ public class PushConsumerImpl implements PushConsumer {
         String consumerId = OMSUtil.buildInstanceName();
         this.rocketmqPushConsumer.setInstanceName(consumerId);
         properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
+        this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS);
 
         this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
     }
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index e40e2d4..53fc0f9 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
 
@@ -45,7 +46,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
     final KeyValue properties;
     final DefaultMQProducer rocketmqProducer;
     private boolean started = false;
-    final ClientConfig clientConfig;
+    private final ClientConfig clientConfig;
 
     AbstractOMSProducer(final KeyValue properties) {
         this.properties = properties;
@@ -67,6 +68,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
         this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout());
         this.rocketmqProducer.setInstanceName(producerId);
         this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
+        this.rocketmqProducer.setLanguage(LanguageCode.OMS);
         properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId);
     }
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
index 17ce919..4382af3 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
@@ -28,7 +28,8 @@ public enum LanguageCode {
     OTHER((byte) 7),
     HTTP((byte) 8),
     GO((byte) 9),
-    PHP((byte) 10);
+    PHP((byte) 10),
+    OMS((byte) 11);
 
     private byte code;
 

-- 
To stop receiving notification emails like this one, please contact
lizhanhui@apache.org.