You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2018/05/23 06:21:35 UTC
[rocketmq] 02/04: Tag language of clients initialized through OMS
as 'OMS'
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 9c0e5360e109b2a5c4c86ed7053a59f868b078ee
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon May 21 11:30:12 2018 +0800
Tag language of clients initialized through OMS as 'OMS'
---
.../java/org/apache/rocketmq/client/ClientConfig.java | 15 ++++++++++++++-
.../apache/rocketmq/client/impl/MQClientAPIImpl.java | 17 ++++++-----------
.../rocketmq/consumer/PullConsumerImpl.java | 5 ++++-
.../rocketmq/consumer/PushConsumerImpl.java | 2 ++
.../rocketmq/producer/AbstractOMSProducer.java | 4 +++-
.../apache/rocketmq/remoting/protocol/LanguageCode.java | 3 ++-
6 files changed, 31 insertions(+), 15 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index a9eabfe..d798164 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -20,6 +20,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
/**
* Client Common configuration
@@ -48,6 +49,8 @@ public class ClientConfig {
private boolean useTLS = TlsSystemConfig.tlsEnable;
+ private LanguageCode language = LanguageCode.JAVA;
+
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
@@ -96,6 +99,7 @@ public class ClientConfig {
this.unitName = cc.unitName;
this.vipChannelEnabled = cc.vipChannelEnabled;
this.useTLS = cc.useTLS;
+ this.language = cc.language;
}
public ClientConfig cloneClientConfig() {
@@ -111,6 +115,7 @@ public class ClientConfig {
cc.unitName = unitName;
cc.vipChannelEnabled = vipChannelEnabled;
cc.useTLS = useTLS;
+ cc.language = language;
return cc;
}
@@ -186,12 +191,20 @@ public class ClientConfig {
this.useTLS = useTLS;
}
+ public LanguageCode getLanguage() {
+ return language;
+ }
+
+ public void setLanguage(LanguageCode language) {
+ this.language = language;
+ }
+
@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
- + vipChannelEnabled + ", useTLS=" + useTLS + "]";
+ + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + "]";
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index d4ed1ec..ade6990 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.client.impl;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
@@ -49,7 +48,6 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
@@ -137,6 +135,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
@@ -156,7 +155,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class MQClientAPIImpl {
private final static InternalLogger log = ClientLogger.getLog();
- public static boolean sendSmartMsg =
+ private static boolean sendSmartMsg =
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
static {
@@ -217,13 +216,9 @@ public class MQClientAPIImpl {
}
public void updateNameServerAddressList(final String addrs) {
- List<String> lst = new ArrayList<String>();
String[] addrArray = addrs.split(";");
- for (String addr : addrArray) {
- lst.add(addr);
- }
-
- this.remotingClient.updateNameServerAddressList(lst);
+ List<String> list = Arrays.asList(addrArray);
+ this.remotingClient.updateNameServerAddressList(list);
}
public void start() {
@@ -857,7 +852,7 @@ public class MQClientAPIImpl {
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
-
+ request.setLanguage(clientConfig.getLanguage());
request.setBody(heartbeatData.encode());
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index c11da58..d673510 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class PullConsumerImpl implements PullConsumer {
private final DefaultMQPullConsumer rocketmqPullConsumer;
@@ -46,7 +47,7 @@ public class PullConsumerImpl implements PullConsumer {
private final LocalMessageCache localMessageCache;
private final ClientConfig clientConfig;
- final static InternalLogger log = ClientLogger.getLog();
+ private final static InternalLogger log = ClientLogger.getLog();
public PullConsumerImpl(final KeyValue properties) {
this.properties = properties;
@@ -77,6 +78,8 @@ public class PullConsumerImpl implements PullConsumer {
this.rocketmqPullConsumer.setInstanceName(consumerId);
properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
+ this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS);
+
this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index 46f6775..d5d394a 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class PushConsumerImpl implements PushConsumer {
private final DefaultMQPushConsumer rocketmqPushConsumer;
@@ -73,6 +74,7 @@ public class PushConsumerImpl implements PushConsumer {
String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId);
properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
+ this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS);
this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index e40e2d4..53fc0f9 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
@@ -45,7 +46,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
final KeyValue properties;
final DefaultMQProducer rocketmqProducer;
private boolean started = false;
- final ClientConfig clientConfig;
+ private final ClientConfig clientConfig;
AbstractOMSProducer(final KeyValue properties) {
this.properties = properties;
@@ -67,6 +68,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout());
this.rocketmqProducer.setInstanceName(producerId);
this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
+ this.rocketmqProducer.setLanguage(LanguageCode.OMS);
properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
index 17ce919..4382af3 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
@@ -28,7 +28,8 @@ public enum LanguageCode {
OTHER((byte) 7),
HTTP((byte) 8),
GO((byte) 9),
- PHP((byte) 10);
+ PHP((byte) 10),
+ OMS((byte) 11);
private byte code;
--
To stop receiving notification emails like this one, please contact
lizhanhui@apache.org.