You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by se...@apache.org on 2021/12/08 06:31:45 UTC

[rocketmq-streams] branch main updated: polish(window) add default rebalance strategy

This is an automated email from the ASF dual-hosted git repository.

seraph pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 8fff972  polish(window) add default rebalance strategy
     new d0d88fb  Merge pull request #98 from duhenglucky/main_strategy
8fff972 is described below

commit 8fff972c44bc240c8d7aaa0255ed90e46cdc0b5e
Author: duhenglucky <du...@apache.org>
AuthorDate: Wed Dec 8 14:12:06 2021 +0800

    polish(window) add default rebalance strategy
---
 .../rocketmq/streams/source/RocketMQSource.java    |  2 +-
 .../service/AbstractConfigurableService.java       | 45 +++++++---------------
 2 files changed, 14 insertions(+), 33 deletions(-)

diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index 3bd7849..5966fee 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -78,7 +78,7 @@ public class RocketMQSource extends AbstractSupportShuffleSource {
 
     protected Long pullIntervalMs;
 
-    protected String strategyName;
+    protected String strategyName = STRATEGY_AVERAGE;
 
     protected transient DefaultMQPushConsumer consumer;
     protected transient ConsumeFromWhere consumeFromWhere;//默认从哪里消费,不会被持久化。不设置默认从尾部消费
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
index 5bf44da..aa7037a 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.streams.configurable.service;
 
 import com.alibaba.fastjson.JSONObject;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -28,7 +27,6 @@ import java.util.Properties;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.component.AbstractComponent;
@@ -94,7 +92,7 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
 
     protected boolean equals(String key, List<?> newConfigureList) {
         for (Object o : newConfigureList) {
-            IConfigurable configure = (IConfigurable)o;
+            IConfigurable configure = (IConfigurable) o;
             String tempKey = getConfigureKey(configure.getNameSpace(), configure.getType(), configure.getConfigureName());
             if (key.equals(tempKey)) {
                 IConfigurable oldConfigure = configurableMap.get(key);
@@ -115,31 +113,24 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
         }
         List<T> result = new ArrayList<T>();
         for (IConfigurable configurable : list) {
-            result.add((T)configurable);
+            result.add((T) configurable);
         }
         return result;
     }
 
     @Override
     public boolean refreshConfigurable(String namespace) {
-        //每次刷新,重新刷新配置文件
-        //if(ComponentCreator.propertiesPath!=null){
-        //    ComponentCreator.setProperties(ComponentCreator.propertiesPath);
-        //}
+
         this.namespace = namespace;
-        // Map<String, List<IConfigurable>> namespace2ConfigurableMap = new HashMap<>();
         Map<String, List<IConfigurable>> tempType2ConfigurableMap = new HashMap<>();
         Map<String, IConfigurable> tempName2ConfigurableMap = new HashMap<>();
         GetConfigureResult configures = loadConfigurable(namespace);
-        // updateConfiguresCache(configures.getConfigure());
         if (configures != null && configures.isQuerySuccess() && configures.getConfigurables() != null) {
-            // List<Configure> configureList = filterConfigure(configures.getConfigure());
             List<IConfigurable> configurables = configures.getConfigurables();
             List<IConfigurable> configurableList = checkAndUpdateConfigurables(configurables, tempType2ConfigurableMap, tempName2ConfigurableMap);
-            // this.namespace2ConfigurableMap = namespace2ConfigurableMap;
             for (IConfigurable configurable : configurableList) {
                 if (configurable instanceof IAfterConfigurableRefreshListener) {
-                    ((IAfterConfigurableRefreshListener)configurable).doProcessAfterRefreshConfigurable(this);
+                    ((IAfterConfigurableRefreshListener) configurable).doProcessAfterRefreshConfigurable(this);
                 }
             }
             return true;
@@ -149,10 +140,12 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
 
     @Override
     public <T> T queryConfigurable(String configurableType, String name) {
-        return (T)queryConfigurableByIdent(configurableType, name);
+        return (T) queryConfigurableByIdent(configurableType, name);
     }
 
-    protected List<IConfigurable> checkAndUpdateConfigurables(List<IConfigurable> configurables, Map<String, List<IConfigurable>> tempType2ConfigurableMap, Map<String, IConfigurable> tempName2ConfigurableMap) {
+    protected List<IConfigurable> checkAndUpdateConfigurables(List<IConfigurable> configurables,
+        Map<String, List<IConfigurable>> tempType2ConfigurableMap,
+        Map<String, IConfigurable> tempName2ConfigurableMap) {
         List<IConfigurable> configurableList = new ArrayList<>();
         for (IConfigurable configurable : configurables) {
             try {
@@ -185,7 +178,7 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
 
     private void destroyOldConfigurable(IConfigurable oldConfigurable) {
         if (AbstractConfigurable.class.isInstance(oldConfigurable)) {
-            ((AbstractConfigurable)oldConfigurable).destroy();
+            ((AbstractConfigurable) oldConfigurable).destroy();
         }
         String key = getConfigureKey(oldConfigurable.getNameSpace(), oldConfigurable.getType(),
             oldConfigurable.getConfigureName());
@@ -194,7 +187,7 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
 
     protected void initConfigurable(IConfigurable configurable) {
         if (AbstractConfigurable.class.isInstance(configurable)) {
-            AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
+            AbstractConfigurable abstractConfigurable = (AbstractConfigurable) configurable;
             abstractConfigurable.setConfigurableService(this);
         }
 
@@ -202,9 +195,6 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
 
     }
 
-    /**
-     * 内部使用
-     */
     private ScheduledExecutorService scheduledExecutorService;
 
     @Override
@@ -232,10 +222,6 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
             }, polingTime, polingTime, TimeUnit.SECONDS);
         }
     }
-    // @Override
-    // public List<IConfigurable> queryConfigurable(String nameSpace) {
-    // return namespace2ConfigurableMap.get(nameSpace);
-    // }
 
     @Override
     public List<IConfigurable> queryConfigurable(String type) {
@@ -260,7 +246,6 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
 
     @Override
     public void update(IConfigurable configurable) {
-        // update(configurable,name2ConfigurableMap,type2ConfigurableMap);
         updateConfigurable(configurable);
     }
 
@@ -284,7 +269,6 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
             IConfigurable oldConfigurable = this.name2ConfigurableMap.get(nameKey);
             if (equals(configureKey, configurableList)) {
                 configurable = oldConfigurable;
-                // name2ConfigurableMap.put(nameKey, name2ConfigurableMap.get(nameKey));
             } else {
                 destroyOldConfigurable(oldConfigurable);
                 initConfigurable(configurable);
@@ -297,14 +281,12 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
         updateConfiguresCache(configurable);
         name2ConfigurableMap.put(nameKey, configurable);
         String typeKey = MapKeyUtil.createKey(configurable.getType());
-        // put2Map(namespace2ConfigurableMap, namespace, configurable);
         put2Map(type2ConfigurableMap, typeKey, configurable);
         return isUpdate;
     }
 
     @Override
     public void insert(IConfigurable configurable) {
-        // update(configurable,name2ConfigurableMap,type2ConfigurableMap);
         insertConfigurable(configurable);
     }
 
@@ -371,7 +353,6 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
             jsonObject.put(CLASS_NAME, configurable.getClass().getName());
             configure.setJsonValue(jsonObject.toJSONString());
         }
-        // configure.createIdentification();
         return configure;
     }
 
@@ -383,7 +364,7 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
         }
         Map<String, T> result = new HashMap<String, T>();
         for (IConfigurable configurable : configurables) {
-            result.put(configurable.getConfigureName(), (T)configurable);
+            result.put(configurable.getConfigureName(), (T) configurable);
         }
         return result;
     }
@@ -423,7 +404,7 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
         configurable.setNameSpace(namespace);
         configurable.setType(type);
         if (AbstractConfigurable.class.isInstance(configurable)) {
-            AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
+            AbstractConfigurable abstractConfigurable = (AbstractConfigurable) configurable;
             abstractConfigurable.setConfigurableService(this);
         }
         configurable.toObject(jsonValue);
@@ -450,7 +431,7 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
                     jsonString);
             if (configurable instanceof Entity) {
                 // add by wangtl 20171110 Configurable接口第三方包也在用,故不能Configurable里加接口,只能加到抽象类里,这里强转下
-                Entity abs = (Entity)configurable;
+                Entity abs = (Entity) configurable;
                 abs.setId(configure.getId());
                 abs.setGmtCreate(configure.getGmtCreate());
                 abs.setGmtModified(configure.getGmtModified());