You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2022/07/17 12:58:55 UTC

[GitHub] [dubbo-spi-extensions] AlbumenJ commented on a diff in pull request #130: Support rocketmq protocol

AlbumenJ commented on code in PR #130:
URL: https://github.com/apache/dubbo-spi-extensions/pull/130#discussion_r922827907


##########
dubbo-api-docs/pom.xml:
##########
@@ -201,7 +201,7 @@
                         <dependencies>expand</dependencies>
                     </pomElements>
                 </configuration>
-                <executions>
+               <!--  <executions>

Review Comment:
   recover this line



##########
dubbo-api-docs/pom.xml:
##########
@@ -163,7 +163,7 @@
 
     <build>
         <plugins>
-            <plugin>
+           <!--  <plugin>

Review Comment:
   recover this line



##########
dubbo-registry-extensions/dubbo-registry-nameservice/pom.xml:
##########
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.dubbo.extensions</groupId>
+		<artifactId>dubbo-registry-extensions</artifactId>
+		<version>${revision}</version>
+	</parent>
+	<artifactId>dubbo-registry-nameservice</artifactId>
+	<name>dubbo-registry-nameservice</name>
+	<url>http://maven.apache.org</url>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.1</version>
+			<scope>test</scope>
+		</dependency>

Review Comment:
   this should be inherited from parent pom



##########
dubbo-registry-extensions/dubbo-registry-nameservice/pom.xml:
##########
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.dubbo.extensions</groupId>
+		<artifactId>dubbo-registry-extensions</artifactId>
+		<version>${revision}</version>
+	</parent>
+	<artifactId>dubbo-registry-nameservice</artifactId>

Review Comment:
   rename `dubbo-registry-nameservice` to `dubbo-registry-rmq-nameservice` or `dubbo-registry-rocketmq-nameservice`



##########
dubbo-registry-extensions/dubbo-registry-nameservice/src/main/java/org/apache/dubbo/registry/nameservice/NameServiceRegistry.java:
##########
@@ -0,0 +1,226 @@
+package org.apache.dubbo.registry.nameservice;

Review Comment:
   Add ASF license header for all files



##########
dubbo-registry-extensions/dubbo-registry-nameservice/src/main/java/org/apache/dubbo/registry/nameservice/NameServiceRegistry.java:
##########
@@ -0,0 +1,226 @@
+package org.apache.dubbo.registry.nameservice;
+
+import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.constants.RegistryConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+
+public class NameServiceRegistry extends FailbackRegistry {
+
+	private final Logger logger = LoggerFactory.getLogger(getClass());
+
+	private ScheduledExecutorService scheduledExecutorService;
+
+	private Map<URL, RegistryInfoWrapper> consumerRegistryInfoWrapperMap = new ConcurrentHashMap<>();
+
+	private MQClientInstance client;
+
+	private boolean isNotRoute = true;
+
+	private ClusterInfo clusterInfo;
+
+	private TopicList topicList;
+
+	private long timeoutMillis;
+	
+	private String nameservAddr;
+	
+	private Integer nameservPort;
+	
+	private String groupModel;
+	
+	private String instanceName;
+
+	public NameServiceRegistry(URL url) {
+		super(url);
+		this.nameservAddr = url.getHost();
+		this.nameservPort = url.getPort();
+		this.isNotRoute = url.getParameter("route", true);
+		if (this.isNotRoute) {
+			this.groupModel = url.getParameter("groupModel", "select");
+			this.timeoutMillis = url.getParameter("timeoutMillis", 3000);
+			this.instanceName = url.getParameter("instanceName");
+			ClientConfig clientConfig = new ClientConfig();
+			clientConfig.setNamesrvAddr( this.nameservAddr );
+			clientConfig.setInstanceName(instanceName);
+			client = MQClientManager.getInstance().getOrCreateMQClientInstance(clientConfig);
+			try {
+				this.initBeasInfo();
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+				@Override
+				public Thread newThread(Runnable r) {
+					return new Thread(r, "dubbo-registry-nameservice");
+				}
+			});

Review Comment:
   use `NamedThreadFactory`



##########
dubbo-registry-extensions/dubbo-registry-nameservice/src/main/java/org/apache/dubbo/registry/nameservice/NameServiceRegistry.java:
##########
@@ -0,0 +1,226 @@
+package org.apache.dubbo.registry.nameservice;
+
+import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.constants.RegistryConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+
+public class NameServiceRegistry extends FailbackRegistry {
+
+	private final Logger logger = LoggerFactory.getLogger(getClass());
+
+	private ScheduledExecutorService scheduledExecutorService;
+
+	private Map<URL, RegistryInfoWrapper> consumerRegistryInfoWrapperMap = new ConcurrentHashMap<>();
+
+	private MQClientInstance client;
+
+	private boolean isNotRoute = true;
+
+	private ClusterInfo clusterInfo;
+
+	private TopicList topicList;
+
+	private long timeoutMillis;
+	
+	private String nameservAddr;
+	
+	private Integer nameservPort;
+	
+	private String groupModel;
+	
+	private String instanceName;
+
+	public NameServiceRegistry(URL url) {
+		super(url);
+		this.nameservAddr = url.getHost();
+		this.nameservPort = url.getPort();
+		this.isNotRoute = url.getParameter("route", true);
+		if (this.isNotRoute) {
+			this.groupModel = url.getParameter("groupModel", "select");
+			this.timeoutMillis = url.getParameter("timeoutMillis", 3000);
+			this.instanceName = url.getParameter("instanceName");
+			ClientConfig clientConfig = new ClientConfig();
+			clientConfig.setNamesrvAddr( this.nameservAddr );
+			clientConfig.setInstanceName(instanceName);
+			client = MQClientManager.getInstance().getOrCreateMQClientInstance(clientConfig);
+			try {
+				this.initBeasInfo();
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+				@Override
+				public Thread newThread(Runnable r) {
+					return new Thread(r, "dubbo-registry-nameservice");
+				}
+			});
+			scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						NameServiceRegistry.this.initBeasInfo();
+
+						if (consumerRegistryInfoWrapperMap.isEmpty()) {
+							return;
+						}
+						for (Entry<URL, RegistryInfoWrapper> e : consumerRegistryInfoWrapperMap.entrySet()) {
+							List<URL> urls = new ArrayList<URL>();
+							NameServiceRegistry.this.pullRoute(e.getValue().serviceName, e.getKey(), urls);
+							e.getValue().listener.notify(urls);
+						}
+					} catch (Exception e) {
+						logger.error("ScheduledTask pullRoute exception", e);
+					}
+				}
+			}, 1000 * 10, 3000 * 10, TimeUnit.MILLISECONDS);
+		} 
+	}
+
+	private void initBeasInfo() throws Exception {
+		this.clusterInfo = this.client.getMQClientAPIImpl().getBrokerClusterInfo(timeoutMillis);
+		this.topicList = this.client.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis);
+	}
+
+	private URL createProviderURL(ServiceName serviceName, URL url, int queue) {
+		URL providerURL = new URL("rocketmq", this.nameservAddr, this.nameservPort, serviceName.getServiceInterface());
+		providerURL.addParameters(url.getParameters());
+
+		providerURL.addParameter(CommonConstants.INTERFACE_KEY, serviceName.getServiceInterface());
+		providerURL.addParameter(CommonConstants.PATH_KEY, serviceName.getServiceInterface());
+		providerURL.addParameter("bean.name", "ServiceBean:" + serviceName.getServiceInterface());
+		providerURL.addParameter(CommonConstants.SIDE_KEY, CommonConstants.PROVIDER);
+		providerURL.addParameter(RegistryConstants.CATEGORY_KEY, "providers");
+		providerURL.addParameter(CommonConstants.PROTOCOL_KEY, "rocketmq");
+		providerURL.addParameter("queueId", queue);
+		providerURL.addParameter("topic", serviceName.getValue());
+		providerURL.addParameter("groupModel", this.groupModel);
+		return providerURL;
+	}
+	
+	private ServiceName createServiceName(URL url) {
+		return new ServiceName(url,this.groupModel);
+	}
+
+	private boolean isAdminProtocol(URL url) {
+		return ADMIN_PROTOCOL.equals(url.getProtocol());
+	}
+
+	private boolean createTopic(ServiceName serviceName) {
+		if (!this.topicList.getTopicList().contains(serviceName.getValue())) {
+			for (Entry<String, BrokerData> entry : this.clusterInfo.getBrokerAddrTable().entrySet()) {
+				String brokerArr = entry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+				try {
+					TopicConfig topicConfig = new TopicConfig(serviceName.getValue());
+					topicConfig.setReadQueueNums(8);
+					topicConfig.setWriteQueueNums(8);
+					this.client.getMQClientAPIImpl().createTopic(brokerArr, null, topicConfig, timeoutMillis);
+				} catch (Exception e) {
+					logger.error(e.getMessage(), e);
+				}
+			}
+			return true;
+		} else {
+			return false;
+		}
+
+	}
+
+	@Override
+	public boolean isAvailable() {
+		return false;
+	}
+
+	@Override
+	public void doRegister(URL url) {
+		ServiceName serviceName = this.createServiceName(url);
+		this.createTopic(serviceName);
+		url.addParameter("namesrv", this.nameservAddr+":"+this.nameservPort);
+		url.addParameter("topic", serviceName.getValue());
+		url.addParameter("groupModel", this.groupModel);

Review Comment:
   will not tack effect.



##########
dubbo-registry-extensions/pom.xml:
##########
@@ -14,10 +14,7 @@
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  --><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

Review Comment:
   revert this line



##########
dubbo-rpc-extensions/dubbo-rpc-rocket/pom.xml:
##########
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.dubbo.extensions</groupId>
+		<artifactId>dubbo-rpc-extensions</artifactId>
+		<version>${revision}</version>
+	</parent>
+	<artifactId>dubbo-rpc-rocket</artifactId>
+	<name>dubbo-rpc-rocket</name>
+	<url>http://maven.apache.org</url>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>

Review Comment:
   remove these lines



##########
dubbo-api-docs/pom.xml:
##########
@@ -216,7 +216,7 @@
                             <goal>clean</goal>
                         </goals>
                     </execution>
-                </executions>
+                </executions> -->

Review Comment:
   recover this line



##########
dubbo-api-docs/pom.xml:
##########
@@ -189,7 +189,7 @@
                         </goals>
                     </execution>
                 </executions>
-            </plugin>
+            </plugin> -->

Review Comment:
   recover this line



##########
dubbo-registry-extensions/dubbo-registry-nameservice/pom.xml:
##########
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<project

Review Comment:
   Please add ASF license header



##########
dubbo-registry-extensions/dubbo-registry-nameservice/pom.xml:
##########
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.dubbo.extensions</groupId>
+		<artifactId>dubbo-registry-extensions</artifactId>
+		<version>${revision}</version>
+	</parent>
+	<artifactId>dubbo-registry-nameservice</artifactId>
+	<name>dubbo-registry-nameservice</name>
+	<url>http://maven.apache.org</url>

Review Comment:
   remove this line



##########
dubbo-registry-extensions/dubbo-registry-nameservice/pom.xml:
##########
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.dubbo.extensions</groupId>
+		<artifactId>dubbo-registry-extensions</artifactId>
+		<version>${revision}</version>
+	</parent>
+	<artifactId>dubbo-registry-nameservice</artifactId>
+	<name>dubbo-registry-nameservice</name>
+	<url>http://maven.apache.org</url>
+	<properties>

Review Comment:
   add version tag.
   1.0.0-SNAPSHOT



##########
dubbo-registry-extensions/dubbo-registry-nameservice/pom.xml:
##########
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.dubbo.extensions</groupId>
+		<artifactId>dubbo-registry-extensions</artifactId>
+		<version>${revision}</version>
+	</parent>
+	<artifactId>dubbo-registry-nameservice</artifactId>
+	<name>dubbo-registry-nameservice</name>
+	<url>http://maven.apache.org</url>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.dubbo</groupId>
+			<artifactId>dubbo-registry-api</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.dubbo</groupId>
+			<artifactId>dubbo-common</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-client</artifactId>
+			<version>4.9.3</version>

Review Comment:
   version should be managed by `dubbo-extensions-dependencies-bom`



##########
dubbo-registry-extensions/dubbo-registry-nameservice/src/test/java/org/apache/dubbo/registry/nameservice/AppTest.java:
##########
@@ -0,0 +1,38 @@
+package org.apache.dubbo.registry.nameservice;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;

Review Comment:
   use junit 5



##########
dubbo-registry-extensions/dubbo-registry-nameservice/src/main/java/org/apache/dubbo/registry/nameservice/NameServiceRegistry.java:
##########
@@ -0,0 +1,226 @@
+package org.apache.dubbo.registry.nameservice;
+
+import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.constants.RegistryConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+
+public class NameServiceRegistry extends FailbackRegistry {
+
+	private final Logger logger = LoggerFactory.getLogger(getClass());
+
+	private ScheduledExecutorService scheduledExecutorService;
+
+	private Map<URL, RegistryInfoWrapper> consumerRegistryInfoWrapperMap = new ConcurrentHashMap<>();
+
+	private MQClientInstance client;
+
+	private boolean isNotRoute = true;
+
+	private ClusterInfo clusterInfo;
+
+	private TopicList topicList;
+
+	private long timeoutMillis;
+	
+	private String nameservAddr;
+	
+	private Integer nameservPort;
+	
+	private String groupModel;
+	
+	private String instanceName;
+
+	public NameServiceRegistry(URL url) {
+		super(url);
+		this.nameservAddr = url.getHost();
+		this.nameservPort = url.getPort();
+		this.isNotRoute = url.getParameter("route", true);
+		if (this.isNotRoute) {
+			this.groupModel = url.getParameter("groupModel", "select");
+			this.timeoutMillis = url.getParameter("timeoutMillis", 3000);
+			this.instanceName = url.getParameter("instanceName");
+			ClientConfig clientConfig = new ClientConfig();
+			clientConfig.setNamesrvAddr( this.nameservAddr );
+			clientConfig.setInstanceName(instanceName);
+			client = MQClientManager.getInstance().getOrCreateMQClientInstance(clientConfig);
+			try {
+				this.initBeasInfo();
+			} catch (Exception e) {
+				throw new RuntimeException(e);

Review Comment:
   add log



##########
pom.xml:
##########
@@ -588,7 +588,7 @@
                                 </goals>
                             </execution>
                         </executions>
-                    </plugin>
+                    </plugin> -->

Review Comment:
   revert this line



##########
dubbo-registry-extensions/dubbo-registry-nameservice/src/main/java/org/apache/dubbo/registry/nameservice/App.java:
##########
@@ -0,0 +1,13 @@
+package org.apache.dubbo.registry.nameservice;
+
+/**
+ * Hello world!
+ *
+ */
+public class App 
+{
+    public static void main( String[] args )
+    {
+        System.out.println( "Hello World!" );
+    }
+}

Review Comment:
   remove this class



##########
dubbo-registry-extensions/dubbo-registry-nameservice/src/main/java/org/apache/dubbo/registry/nameservice/NameServiceRegistry.java:
##########
@@ -0,0 +1,226 @@
+package org.apache.dubbo.registry.nameservice;
+
+import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.constants.RegistryConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+
+public class NameServiceRegistry extends FailbackRegistry {
+
+	private final Logger logger = LoggerFactory.getLogger(getClass());
+
+	private ScheduledExecutorService scheduledExecutorService;
+
+	private Map<URL, RegistryInfoWrapper> consumerRegistryInfoWrapperMap = new ConcurrentHashMap<>();
+
+	private MQClientInstance client;
+
+	private boolean isNotRoute = true;
+
+	private ClusterInfo clusterInfo;
+
+	private TopicList topicList;
+
+	private long timeoutMillis;
+	
+	private String nameservAddr;
+	
+	private Integer nameservPort;
+	
+	private String groupModel;
+	
+	private String instanceName;
+
+	public NameServiceRegistry(URL url) {
+		super(url);
+		this.nameservAddr = url.getHost();
+		this.nameservPort = url.getPort();
+		this.isNotRoute = url.getParameter("route", true);
+		if (this.isNotRoute) {
+			this.groupModel = url.getParameter("groupModel", "select");
+			this.timeoutMillis = url.getParameter("timeoutMillis", 3000);
+			this.instanceName = url.getParameter("instanceName");
+			ClientConfig clientConfig = new ClientConfig();
+			clientConfig.setNamesrvAddr( this.nameservAddr );
+			clientConfig.setInstanceName(instanceName);
+			client = MQClientManager.getInstance().getOrCreateMQClientInstance(clientConfig);
+			try {
+				this.initBeasInfo();
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+				@Override
+				public Thread newThread(Runnable r) {
+					return new Thread(r, "dubbo-registry-nameservice");
+				}
+			});
+			scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						NameServiceRegistry.this.initBeasInfo();
+
+						if (consumerRegistryInfoWrapperMap.isEmpty()) {
+							return;
+						}
+						for (Entry<URL, RegistryInfoWrapper> e : consumerRegistryInfoWrapperMap.entrySet()) {
+							List<URL> urls = new ArrayList<URL>();
+							NameServiceRegistry.this.pullRoute(e.getValue().serviceName, e.getKey(), urls);
+							e.getValue().listener.notify(urls);
+						}
+					} catch (Exception e) {
+						logger.error("ScheduledTask pullRoute exception", e);
+					}
+				}
+			}, 1000 * 10, 3000 * 10, TimeUnit.MILLISECONDS);

Review Comment:
   What is the purpose of this job



##########
dubbo-registry-extensions/dubbo-registry-nameservice/src/main/java/org/apache/dubbo/registry/nameservice/NameServiceRegistry.java:
##########
@@ -0,0 +1,226 @@
+package org.apache.dubbo.registry.nameservice;
+
+import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.constants.RegistryConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+
+public class NameServiceRegistry extends FailbackRegistry {
+
+	private final Logger logger = LoggerFactory.getLogger(getClass());
+
+	private ScheduledExecutorService scheduledExecutorService;
+
+	private Map<URL, RegistryInfoWrapper> consumerRegistryInfoWrapperMap = new ConcurrentHashMap<>();
+
+	private MQClientInstance client;
+
+	private boolean isNotRoute = true;
+
+	private ClusterInfo clusterInfo;
+
+	private TopicList topicList;
+
+	private long timeoutMillis;
+	
+	private String nameservAddr;
+	
+	private Integer nameservPort;
+	
+	private String groupModel;
+	
+	private String instanceName;
+
+	public NameServiceRegistry(URL url) {
+		super(url);
+		this.nameservAddr = url.getHost();
+		this.nameservPort = url.getPort();
+		this.isNotRoute = url.getParameter("route", true);
+		if (this.isNotRoute) {

Review Comment:
   the field name is inappropriate



##########
dubbo-registry-extensions/dubbo-registry-nameservice/src/main/java/org/apache/dubbo/registry/nameservice/NameServiceRegistry.java:
##########
@@ -0,0 +1,226 @@
+package org.apache.dubbo.registry.nameservice;
+
+import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.constants.RegistryConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+
+public class NameServiceRegistry extends FailbackRegistry {
+
+	private final Logger logger = LoggerFactory.getLogger(getClass());
+
+	private ScheduledExecutorService scheduledExecutorService;
+
+	private Map<URL, RegistryInfoWrapper> consumerRegistryInfoWrapperMap = new ConcurrentHashMap<>();
+
+	private MQClientInstance client;
+
+	private boolean isNotRoute = true;
+
+	private ClusterInfo clusterInfo;
+
+	private TopicList topicList;
+
+	private long timeoutMillis;
+	
+	private String nameservAddr;
+	
+	private Integer nameservPort;
+	
+	private String groupModel;
+	
+	private String instanceName;
+
+	public NameServiceRegistry(URL url) {
+		super(url);
+		this.nameservAddr = url.getHost();
+		this.nameservPort = url.getPort();
+		this.isNotRoute = url.getParameter("route", true);
+		if (this.isNotRoute) {
+			this.groupModel = url.getParameter("groupModel", "select");
+			this.timeoutMillis = url.getParameter("timeoutMillis", 3000);
+			this.instanceName = url.getParameter("instanceName");
+			ClientConfig clientConfig = new ClientConfig();
+			clientConfig.setNamesrvAddr( this.nameservAddr );
+			clientConfig.setInstanceName(instanceName);
+			client = MQClientManager.getInstance().getOrCreateMQClientInstance(clientConfig);
+			try {
+				this.initBeasInfo();
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+				@Override
+				public Thread newThread(Runnable r) {
+					return new Thread(r, "dubbo-registry-nameservice");
+				}
+			});
+			scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						NameServiceRegistry.this.initBeasInfo();
+
+						if (consumerRegistryInfoWrapperMap.isEmpty()) {
+							return;
+						}
+						for (Entry<URL, RegistryInfoWrapper> e : consumerRegistryInfoWrapperMap.entrySet()) {
+							List<URL> urls = new ArrayList<URL>();
+							NameServiceRegistry.this.pullRoute(e.getValue().serviceName, e.getKey(), urls);
+							e.getValue().listener.notify(urls);
+						}
+					} catch (Exception e) {
+						logger.error("ScheduledTask pullRoute exception", e);
+					}
+				}
+			}, 1000 * 10, 3000 * 10, TimeUnit.MILLISECONDS);
+		} 
+	}
+
+	private void initBeasInfo() throws Exception {
+		this.clusterInfo = this.client.getMQClientAPIImpl().getBrokerClusterInfo(timeoutMillis);
+		this.topicList = this.client.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis);
+	}
+
+	private URL createProviderURL(ServiceName serviceName, URL url, int queue) {
+		URL providerURL = new URL("rocketmq", this.nameservAddr, this.nameservPort, serviceName.getServiceInterface());
+		providerURL.addParameters(url.getParameters());
+
+		providerURL.addParameter(CommonConstants.INTERFACE_KEY, serviceName.getServiceInterface());
+		providerURL.addParameter(CommonConstants.PATH_KEY, serviceName.getServiceInterface());
+		providerURL.addParameter("bean.name", "ServiceBean:" + serviceName.getServiceInterface());
+		providerURL.addParameter(CommonConstants.SIDE_KEY, CommonConstants.PROVIDER);
+		providerURL.addParameter(RegistryConstants.CATEGORY_KEY, "providers");
+		providerURL.addParameter(CommonConstants.PROTOCOL_KEY, "rocketmq");
+		providerURL.addParameter("queueId", queue);
+		providerURL.addParameter("topic", serviceName.getValue());
+		providerURL.addParameter("groupModel", this.groupModel);
+		return providerURL;
+	}
+	
+	private ServiceName createServiceName(URL url) {
+		return new ServiceName(url,this.groupModel);
+	}
+
+	private boolean isAdminProtocol(URL url) {
+		return ADMIN_PROTOCOL.equals(url.getProtocol());
+	}
+
+	private boolean createTopic(ServiceName serviceName) {
+		if (!this.topicList.getTopicList().contains(serviceName.getValue())) {
+			for (Entry<String, BrokerData> entry : this.clusterInfo.getBrokerAddrTable().entrySet()) {
+				String brokerArr = entry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+				try {
+					TopicConfig topicConfig = new TopicConfig(serviceName.getValue());
+					topicConfig.setReadQueueNums(8);
+					topicConfig.setWriteQueueNums(8);
+					this.client.getMQClientAPIImpl().createTopic(brokerArr, null, topicConfig, timeoutMillis);
+				} catch (Exception e) {
+					logger.error(e.getMessage(), e);
+				}
+			}
+			return true;
+		} else {
+			return false;
+		}
+
+	}
+
+	@Override
+	public boolean isAvailable() {
+		return false;
+	}
+
+	@Override
+	public void doRegister(URL url) {
+		ServiceName serviceName = this.createServiceName(url);
+		this.createTopic(serviceName);
+		url.addParameter("namesrv", this.nameservAddr+":"+this.nameservPort);
+		url.addParameter("topic", serviceName.getValue());
+		url.addParameter("groupModel", this.groupModel);

Review Comment:
   should not change url in registry



##########
dubbo-registry-extensions/dubbo-registry-nameservice/src/main/java/org/apache/dubbo/registry/nameservice/NameServiceRegistry.java:
##########
@@ -0,0 +1,226 @@
+package org.apache.dubbo.registry.nameservice;
+
+import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.constants.RegistryConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+
+public class NameServiceRegistry extends FailbackRegistry {
+
+	private final Logger logger = LoggerFactory.getLogger(getClass());
+
+	private ScheduledExecutorService scheduledExecutorService;
+
+	private Map<URL, RegistryInfoWrapper> consumerRegistryInfoWrapperMap = new ConcurrentHashMap<>();
+
+	private MQClientInstance client;
+
+	private boolean isNotRoute = true;
+
+	private ClusterInfo clusterInfo;
+
+	private TopicList topicList;
+
+	private long timeoutMillis;
+	
+	private String nameservAddr;
+	
+	private Integer nameservPort;
+	
+	private String groupModel;
+	
+	private String instanceName;
+
+	public NameServiceRegistry(URL url) {
+		super(url);
+		this.nameservAddr = url.getHost();
+		this.nameservPort = url.getPort();
+		this.isNotRoute = url.getParameter("route", true);
+		if (this.isNotRoute) {
+			this.groupModel = url.getParameter("groupModel", "select");
+			this.timeoutMillis = url.getParameter("timeoutMillis", 3000);
+			this.instanceName = url.getParameter("instanceName");
+			ClientConfig clientConfig = new ClientConfig();
+			clientConfig.setNamesrvAddr( this.nameservAddr );
+			clientConfig.setInstanceName(instanceName);
+			client = MQClientManager.getInstance().getOrCreateMQClientInstance(clientConfig);
+			try {
+				this.initBeasInfo();
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+				@Override
+				public Thread newThread(Runnable r) {
+					return new Thread(r, "dubbo-registry-nameservice");
+				}
+			});
+			scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						NameServiceRegistry.this.initBeasInfo();
+
+						if (consumerRegistryInfoWrapperMap.isEmpty()) {
+							return;
+						}
+						for (Entry<URL, RegistryInfoWrapper> e : consumerRegistryInfoWrapperMap.entrySet()) {
+							List<URL> urls = new ArrayList<URL>();
+							NameServiceRegistry.this.pullRoute(e.getValue().serviceName, e.getKey(), urls);
+							e.getValue().listener.notify(urls);
+						}
+					} catch (Exception e) {
+						logger.error("ScheduledTask pullRoute exception", e);
+					}
+				}
+			}, 1000 * 10, 3000 * 10, TimeUnit.MILLISECONDS);
+		} 
+	}
+
+	private void initBeasInfo() throws Exception {
+		this.clusterInfo = this.client.getMQClientAPIImpl().getBrokerClusterInfo(timeoutMillis);
+		this.topicList = this.client.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis);
+	}
+
+	private URL createProviderURL(ServiceName serviceName, URL url, int queue) {
+		URL providerURL = new URL("rocketmq", this.nameservAddr, this.nameservPort, serviceName.getServiceInterface());
+		providerURL.addParameters(url.getParameters());
+
+		providerURL.addParameter(CommonConstants.INTERFACE_KEY, serviceName.getServiceInterface());
+		providerURL.addParameter(CommonConstants.PATH_KEY, serviceName.getServiceInterface());
+		providerURL.addParameter("bean.name", "ServiceBean:" + serviceName.getServiceInterface());
+		providerURL.addParameter(CommonConstants.SIDE_KEY, CommonConstants.PROVIDER);
+		providerURL.addParameter(RegistryConstants.CATEGORY_KEY, "providers");
+		providerURL.addParameter(CommonConstants.PROTOCOL_KEY, "rocketmq");
+		providerURL.addParameter("queueId", queue);
+		providerURL.addParameter("topic", serviceName.getValue());
+		providerURL.addParameter("groupModel", this.groupModel);

Review Comment:
   url is copy on write
   
   use `providerURL.addParameter(xxx);`



##########
dubbo-rpc-extensions/dubbo-rpc-rocket/pom.xml:
##########
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.dubbo.extensions</groupId>
+		<artifactId>dubbo-rpc-extensions</artifactId>
+		<version>${revision}</version>
+	</parent>
+	<artifactId>dubbo-rpc-rocket</artifactId>
+	<name>dubbo-rpc-rocket</name>

Review Comment:
   rename to `dubbo-rpc-rocketmq`



##########
pom.xml:
##########
@@ -529,7 +529,7 @@
             </activation>
             <build>
                 <plugins>
-                    <plugin>
+                    <!-- <plugin>

Review Comment:
   revert this line



##########
dubbo-rpc-extensions/dubbo-rpc-rocket/pom.xml:
##########
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.dubbo.extensions</groupId>
+		<artifactId>dubbo-rpc-extensions</artifactId>
+		<version>${revision}</version>
+	</parent>
+	<artifactId>dubbo-rpc-rocket</artifactId>
+	<name>dubbo-rpc-rocket</name>
+	<url>http://maven.apache.org</url>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.dubbo</groupId>
+			<artifactId>dubbo-rpc-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.dubbo</groupId>
+			<artifactId>dubbo-rpc-dubbo</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.rocketmq</groupId>
+			<artifactId>rocketmq-client</artifactId>
+			<version>4.9.2</version>

Review Comment:
   version should be managed by `dubbo-extensions-dependencies-bom`



##########
dubbo-registry-extensions/dubbo-registry-nameservice/src/main/java/org/apache/dubbo/registry/nameservice/NameServiceRegistry.java:
##########
@@ -0,0 +1,226 @@
+package org.apache.dubbo.registry.nameservice;
+
+import static org.apache.dubbo.registry.Constants.ADMIN_PROTOCOL;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.constants.RegistryConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.support.FailbackRegistry;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+
+public class NameServiceRegistry extends FailbackRegistry {
+
+	private final Logger logger = LoggerFactory.getLogger(getClass());
+
+	private ScheduledExecutorService scheduledExecutorService;
+
+	private Map<URL, RegistryInfoWrapper> consumerRegistryInfoWrapperMap = new ConcurrentHashMap<>();
+
+	private MQClientInstance client;
+
+	private boolean isNotRoute = true;
+
+	private ClusterInfo clusterInfo;
+
+	private TopicList topicList;
+
+	private long timeoutMillis;
+	
+	private String nameservAddr;
+	
+	private Integer nameservPort;
+	
+	private String groupModel;
+	
+	private String instanceName;
+
+	public NameServiceRegistry(URL url) {
+		super(url);
+		this.nameservAddr = url.getHost();
+		this.nameservPort = url.getPort();
+		this.isNotRoute = url.getParameter("route", true);
+		if (this.isNotRoute) {
+			this.groupModel = url.getParameter("groupModel", "select");
+			this.timeoutMillis = url.getParameter("timeoutMillis", 3000);
+			this.instanceName = url.getParameter("instanceName");
+			ClientConfig clientConfig = new ClientConfig();
+			clientConfig.setNamesrvAddr( this.nameservAddr );
+			clientConfig.setInstanceName(instanceName);
+			client = MQClientManager.getInstance().getOrCreateMQClientInstance(clientConfig);
+			try {
+				this.initBeasInfo();
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+			this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+				@Override
+				public Thread newThread(Runnable r) {
+					return new Thread(r, "dubbo-registry-nameservice");
+				}
+			});
+			scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						NameServiceRegistry.this.initBeasInfo();
+
+						if (consumerRegistryInfoWrapperMap.isEmpty()) {
+							return;
+						}
+						for (Entry<URL, RegistryInfoWrapper> e : consumerRegistryInfoWrapperMap.entrySet()) {
+							List<URL> urls = new ArrayList<URL>();
+							NameServiceRegistry.this.pullRoute(e.getValue().serviceName, e.getKey(), urls);
+							e.getValue().listener.notify(urls);
+						}
+					} catch (Exception e) {
+						logger.error("ScheduledTask pullRoute exception", e);
+					}
+				}
+			}, 1000 * 10, 3000 * 10, TimeUnit.MILLISECONDS);
+		} 
+	}
+
+	private void initBeasInfo() throws Exception {
+		this.clusterInfo = this.client.getMQClientAPIImpl().getBrokerClusterInfo(timeoutMillis);
+		this.topicList = this.client.getMQClientAPIImpl().getTopicListFromNameServer(timeoutMillis);
+	}
+
+	private URL createProviderURL(ServiceName serviceName, URL url, int queue) {
+		URL providerURL = new URL("rocketmq", this.nameservAddr, this.nameservPort, serviceName.getServiceInterface());
+		providerURL.addParameters(url.getParameters());
+
+		providerURL.addParameter(CommonConstants.INTERFACE_KEY, serviceName.getServiceInterface());
+		providerURL.addParameter(CommonConstants.PATH_KEY, serviceName.getServiceInterface());
+		providerURL.addParameter("bean.name", "ServiceBean:" + serviceName.getServiceInterface());
+		providerURL.addParameter(CommonConstants.SIDE_KEY, CommonConstants.PROVIDER);
+		providerURL.addParameter(RegistryConstants.CATEGORY_KEY, "providers");
+		providerURL.addParameter(CommonConstants.PROTOCOL_KEY, "rocketmq");
+		providerURL.addParameter("queueId", queue);
+		providerURL.addParameter("topic", serviceName.getValue());
+		providerURL.addParameter("groupModel", this.groupModel);
+		return providerURL;
+	}
+	
+	private ServiceName createServiceName(URL url) {
+		return new ServiceName(url,this.groupModel);
+	}
+
+	private boolean isAdminProtocol(URL url) {
+		return ADMIN_PROTOCOL.equals(url.getProtocol());
+	}
+
+	private boolean createTopic(ServiceName serviceName) {
+		if (!this.topicList.getTopicList().contains(serviceName.getValue())) {
+			for (Entry<String, BrokerData> entry : this.clusterInfo.getBrokerAddrTable().entrySet()) {
+				String brokerArr = entry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
+				try {
+					TopicConfig topicConfig = new TopicConfig(serviceName.getValue());
+					topicConfig.setReadQueueNums(8);
+					topicConfig.setWriteQueueNums(8);
+					this.client.getMQClientAPIImpl().createTopic(brokerArr, null, topicConfig, timeoutMillis);
+				} catch (Exception e) {
+					logger.error(e.getMessage(), e);
+				}
+			}
+			return true;
+		} else {
+			return false;
+		}
+
+	}
+
+	@Override
+	public boolean isAvailable() {
+		return false;
+	}
+
+	@Override
+	public void doRegister(URL url) {
+		ServiceName serviceName = this.createServiceName(url);
+		this.createTopic(serviceName);
+		url.addParameter("namesrv", this.nameservAddr+":"+this.nameservPort);
+		url.addParameter("topic", serviceName.getValue());
+		url.addParameter("groupModel", this.groupModel);
+	}
+
+	@Override
+	public void doUnregister(URL url) {
+
+	}
+
+	@Override
+	public void doSubscribe(URL url, NotifyListener listener) {
+		List<URL> urls = new ArrayList<URL>();
+		ServiceName serviceName = this.createServiceName(url);
+		if (this.isNotRoute) {
+			URL providerURL = this.createProviderURL(serviceName, url, -1);
+			urls.add(providerURL);
+		} else {
+			this.pullRoute(serviceName, url, urls);
+		}
+		listener.notify(urls);
+	}
+
+	void pullRoute(ServiceName serviceName, URL url, List<URL> urls) {
+		try {
+			this.createTopic(serviceName);
+			String topic = serviceName.getValue();
+			TopicRouteData topicRouteData = this.client.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic,
+					this.timeoutMillis);
+
+			Map<String, String> brokerAddrBybrokerName = new HashMap<>();
+			for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
+				brokerAddrBybrokerName.put(brokerData.getBrokerName(), brokerData.selectBrokerAddr());
+			}
+			for (QueueData queueData : topicRouteData.getQueueDatas()) {
+				if (PermName.isReadable(queueData.getPerm())) {
+					for (int i = 0; i < queueData.getReadQueueNums(); i++) {
+						URL newUrl = this.createProviderURL(serviceName, url, i);
+						newUrl.addParameter("brokerName", queueData.getBrokerName());
+						urls.add(newUrl);
+					}
+				}
+			}
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void doUnsubscribe(URL url, NotifyListener listener) {
+		this.consumerRegistryInfoWrapperMap.remove(url);
+	}
+
+	private class RegistryInfoWrapper {
+
+		private URL url;
+
+		private NotifyListener listener;
+
+		private ServiceName serviceName;
+	}

Review Comment:
   use setter and getter instead



##########
dubbo-rpc-extensions/dubbo-rpc-rocket/pom.xml:
##########
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.dubbo.extensions</groupId>
+		<artifactId>dubbo-rpc-extensions</artifactId>
+		<version>${revision}</version>
+	</parent>
+	<artifactId>dubbo-rpc-rocket</artifactId>
+	<name>dubbo-rpc-rocket</name>
+	<url>http://maven.apache.org</url>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.dubbo</groupId>
+			<artifactId>dubbo-rpc-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.dubbo</groupId>
+			<artifactId>dubbo-rpc-dubbo</artifactId>
+		</dependency>

Review Comment:
   should not depend on this module



##########
dubbo-rpc-extensions/dubbo-rpc-rocket/src/main/java/org/apache/dubbo/rpc/rocketmq/RocketMQProtocolModel.java:
##########
@@ -0,0 +1,6 @@
+package org.apache.dubbo.rpc.rocketmq;
+
+public enum RocketMQProtocolModel {
+
+	
+}

Review Comment:
   What is the purpose of designing this class



##########
pom.xml:
##########
@@ -381,7 +381,7 @@
                     <flattenMode>resolveCiFriendliesOnly</flattenMode>
                 </configuration>
                 <executions>
-                    <execution>
+                    <!-- <execution>

Review Comment:
   revert this line



##########
dubbo-rpc-extensions/dubbo-rpc-rocket/src/main/java/org/apache/dubbo/rpc/rocketmq/RocketMQInvoker.java:
##########
@@ -0,0 +1,201 @@
+package org.apache.dubbo.rpc.rocketmq;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.buffer.DynamicChannelBuffer;
+import org.apache.dubbo.remoting.buffer.HeapChannelBuffer;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.exchange.Response;
+import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
+import org.apache.dubbo.rpc.AppResponse;
+import org.apache.dubbo.rpc.AsyncRpcResult;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.TimeoutCountDown;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.protocol.AbstractInvoker;
+import org.apache.dubbo.rpc.protocol.dubbo.DubboCountCodec;
+import org.apache.dubbo.rpc.support.RpcUtils;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.RequestCallback;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+
+
+public class RocketMQInvoker<T> extends AbstractInvoker<T> {
+
+	
+	
+	private DubboCountCodec dubboCountCodec = new DubboCountCodec(FrameworkModel.defaultModel());
+	
+	private final ReentrantLock destroyLock = new ReentrantLock();
+
+	private DefaultMQProducer defaultMQProducer;
+
+	private final String version;
+	
+	private String group;
+	
+	private MessageQueue messageQueue;
+	
+	private Channel channel;
+
+	private String topic;
+	
+	private String groupModel;
+	
+	
+	
+	public RocketMQInvoker(Class<T> type, URL url,RocketMQProtocolServer rocketMQProtocolServer) {
+		super(type, url);
+		this.version = url.getParameter(CommonConstants.VERSION_KEY);
+		this.group = url.getParameter(CommonConstants.GROUP_KEY);
+		this.groupModel = url.getParameter("groupModel");
+		this.defaultMQProducer = rocketMQProtocolServer.getDefaultMQProducer();
+		this.topic = url.getParameter("topic");
+		Integer queueId = url.getParameter("queueId",Integer.class,-1);
+		if( queueId != -1) {
+			messageQueue = new MessageQueue();
+			messageQueue.setBrokerName(url.getParameter("brokerName"));
+			messageQueue.setTopic(this.topic);
+			messageQueue.setQueueId(queueId);
+		}
+	}
+	
+
+	@Override
+	protected Result doInvoke(Invocation invocation) throws Throwable {
+		RpcInvocation inv = (RpcInvocation) invocation;
+		final String methodName = RpcUtils.getMethodName(invocation);
+		inv.setAttachment(PATH_KEY, getUrl().getPath());
+		inv.setAttachment(VERSION_KEY, version);
+		// 直连

Review Comment:
   use eng comment



##########
dubbo-rpc-extensions/dubbo-rpc-rocket/pom.xml:
##########
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.apache.dubbo.extensions</groupId>
+		<artifactId>dubbo-rpc-extensions</artifactId>
+		<version>${revision}</version>
+	</parent>
+	<artifactId>dubbo-rpc-rocket</artifactId>
+	<name>dubbo-rpc-rocket</name>
+	<url>http://maven.apache.org</url>
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+	<dependencies>

Review Comment:
   add version
   1.0.0-SNAPSHOT



##########
pom.xml:
##########
@@ -394,7 +394,7 @@
                         <goals>
                             <goal>clean</goal>
                         </goals>
-                    </execution>
+                    </execution> -->

Review Comment:
   revert this line



##########
dubbo-rpc-extensions/dubbo-rpc-rocket/src/main/java/org/apache/dubbo/rpc/rocketmq/RocketMQInvoker.java:
##########
@@ -0,0 +1,201 @@
+package org.apache.dubbo.rpc.rocketmq;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.buffer.DynamicChannelBuffer;
+import org.apache.dubbo.remoting.buffer.HeapChannelBuffer;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.exchange.Response;
+import org.apache.dubbo.remoting.exchange.support.DefaultFuture;
+import org.apache.dubbo.rpc.AppResponse;
+import org.apache.dubbo.rpc.AsyncRpcResult;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.TimeoutCountDown;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.protocol.AbstractInvoker;
+import org.apache.dubbo.rpc.protocol.dubbo.DubboCountCodec;

Review Comment:
   should not depend on this class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org