You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/09/11 13:10:22 UTC

[GitHub] duhengforever closed pull request #458: DefaultMQProducter.start()注释

duhengforever closed pull request #458: DefaultMQProducter.start()注释
URL: https://github.com/apache/rocketmq/pull/458
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index 5567e49b5..82000c22a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -47,7 +47,10 @@ public static String getGroupWithRegularExpression(String origin, String pattern
     }
 
     /**
-     * Validate group
+     * 校验分组名
+     *  1、不能为空
+     *  2、组名正则表达式判断
+     *  3、组名长度校验
      */
     public static void checkGroup(String group) throws MQClientException {
         if (UtilAll.isBlank(group)) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
index 25877d738..64a3aba56 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
@@ -25,10 +25,15 @@
 import org.apache.rocketmq.remoting.RPCHook;
 import org.slf4j.Logger;
 
+/***
+ * 主要整个应用中的MQClientInstance实例,
+ * 每个ip下的每个应用实例映射一个MQClientInstance实例
+ */
 public class MQClientManager {
     private final static Logger log = ClientLogger.getLog();
     private static MQClientManager instance = new MQClientManager();
     private AtomicInteger factoryIndexGenerator = new AtomicInteger();
+    //根据clientId存放客户端实例,每个ip只能有一个,不能重复创建
     private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
         new ConcurrentHashMap<String, MQClientInstance>();
 
@@ -44,18 +49,26 @@ public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientCo
         return getAndCreateMQClientInstance(clientConfig, null);
     }
 
+    /***
+     * 根据clientId维护客户端实例,clienId=ip+"@"+instanceName
+     * 所以同一个ip地址发起的请求基本都就共享一个 MQClientInstance
+     * @param clientConfig
+     * @param rpcHook
+     * @return
+     */
     public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
-        String clientId = clientConfig.buildMQClientId();
+        String clientId = clientConfig.buildMQClientId();//根据客户端ip 创建客户端ID:ip+"@"+instanceName
         MQClientInstance instance = this.factoryTable.get(clientId);
         if (null == instance) {
             instance =
                 new MQClientInstance(clientConfig.cloneClientConfig(),
                     this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
+            //一个ip只能创建一个MQClientInstance实例,也就是不能重复创建
             MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
-            if (prev != null) {
+            if (prev != null) {//表示该ip下已经有MQClientInstance,直接使用即可
                 instance = prev;
                 log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
-            } else {
+            } else {//表示新建MQClientInstance
                 log.info("Created new MQClientInstance for clientId:[{}]", clientId);
             }
         }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 83b9ee767..bf2efb56f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -89,6 +89,9 @@
     private final int instanceIndex;
     private final String clientId;
     private final long bootTimestamp = System.currentTimeMillis();
+    /***
+     * 管理{group,Producter}的映射关系,一个group只能映射一个producter
+     */
     private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
     private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
     private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
@@ -124,11 +127,14 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli
 
     public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
         this.clientConfig = clientConfig;
-        this.instanceIndex = instanceIndex;
+        this.instanceIndex = instanceIndex;//一个递增的值
         this.nettyClientConfig = new NettyClientConfig();
+        //设置客户端回调函数的线程数,通过ClientCallbackExecutorThreads配置
         this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
         this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
+        //创建一个客户端远程处理器
         this.clientRemotingProcessor = new ClientRemotingProcessor(this);
+        //创建一个MQClientAPIImpl
         this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
 
         if (this.clientConfig.getNamesrvAddr() != null) {
@@ -139,14 +145,14 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli
         this.clientId = clientId;
 
         this.mQAdminImpl = new MQAdminImpl(this);
-
+        //创建拉取消息的服务
         this.pullMessageService = new PullMessageService(this);
-
+        //创建负载均衡的服务
         this.rebalanceService = new RebalanceService(this);
-
+        //创建默认的内部 CLIENT_INNER_PRODUCER
         this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
         this.defaultMQProducer.resetClientConfig(clientConfig);
-
+        //创建消费者状态管理
         this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
 
         log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}",
@@ -905,7 +911,7 @@ private void unregisterClient(final String producerGroup, final String consumerG
             }
         }
     }
-
+    //将{group,producter}注册到MQClientInstance的producerTable中,如果group已经存在,则注册失败
     public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
         if (null == group || null == producer) {
             return false;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 7c1697967..a7ae91dc2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -139,20 +139,22 @@ public void start() throws MQClientException {
     }
 
     public void start(final boolean startFactory) throws MQClientException {
+        //根据服务当前状态进行相关操作
         switch (this.serviceState) {
-            case CREATE_JUST:
+            case CREATE_JUST://服务状态是新建的
+                //更新服务状态(这里START_FAILED相当于启动中)
                 this.serviceState = ServiceState.START_FAILED;
-
+                //
                 this.checkConfig();
-
+                //组名不是CLIENT_INNER_PRODUCER,更新实例名称为PID
                 if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                     this.defaultMQProducer.changeInstanceNameToPID();
                 }
-
+                //为每个ip创建一个客户端实例MQClientInstance
                 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
-
+                //将{group,producter}注册到MQClientInstance.producterTable
                 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
-                if (!registerOK) {
+                if (!registerOK) {//注册失败,将状态更新为CREATE_JUST,这样可以保证后期继续start
                     this.serviceState = ServiceState.CREATE_JUST;
                     throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                         + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
@@ -182,14 +184,15 @@ public void start(final boolean startFactory) throws MQClientException {
 
         this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
     }
-
+    //校验Producter配置(只校验了分组)
     private void checkConfig() throws MQClientException {
+        //校验组名
         Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
 
         if (null == this.defaultMQProducer.getProducerGroup()) {
             throw new MQClientException("producerGroup is null", null);
         }
-
+        //组名不能为DEFAULT_PRODUCER
         if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
             throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
                 null);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services