You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by se...@apache.org on 2021/12/08 06:31:45 UTC
[rocketmq-streams] branch main updated: polish(window) add default rebalance strategy
This is an automated email from the ASF dual-hosted git repository.
seraph pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 8fff972 polish(window) add default rebalance strategy
new d0d88fb Merge pull request #98 from duhenglucky/main_strategy
8fff972 is described below
commit 8fff972c44bc240c8d7aaa0255ed90e46cdc0b5e
Author: duhenglucky <du...@apache.org>
AuthorDate: Wed Dec 8 14:12:06 2021 +0800
polish(window) add default rebalance strategy
---
.../rocketmq/streams/source/RocketMQSource.java | 2 +-
.../service/AbstractConfigurableService.java | 45 +++++++---------------
2 files changed, 14 insertions(+), 33 deletions(-)
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index 3bd7849..5966fee 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -78,7 +78,7 @@ public class RocketMQSource extends AbstractSupportShuffleSource {
protected Long pullIntervalMs;
- protected String strategyName;
+ protected String strategyName = STRATEGY_AVERAGE;
protected transient DefaultMQPushConsumer consumer;
protected transient ConsumeFromWhere consumeFromWhere;//默认从哪里消费,不会被持久化。不设置默认从尾部消费
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
index 5bf44da..aa7037a 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.streams.configurable.service;
import com.alibaba.fastjson.JSONObject;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -28,7 +27,6 @@ import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.streams.common.component.AbstractComponent;
@@ -94,7 +92,7 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
protected boolean equals(String key, List<?> newConfigureList) {
for (Object o : newConfigureList) {
- IConfigurable configure = (IConfigurable)o;
+ IConfigurable configure = (IConfigurable) o;
String tempKey = getConfigureKey(configure.getNameSpace(), configure.getType(), configure.getConfigureName());
if (key.equals(tempKey)) {
IConfigurable oldConfigure = configurableMap.get(key);
@@ -115,31 +113,24 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
}
List<T> result = new ArrayList<T>();
for (IConfigurable configurable : list) {
- result.add((T)configurable);
+ result.add((T) configurable);
}
return result;
}
@Override
public boolean refreshConfigurable(String namespace) {
- //每次刷新,重新刷新配置文件
- //if(ComponentCreator.propertiesPath!=null){
- // ComponentCreator.setProperties(ComponentCreator.propertiesPath);
- //}
+
this.namespace = namespace;
- // Map<String, List<IConfigurable>> namespace2ConfigurableMap = new HashMap<>();
Map<String, List<IConfigurable>> tempType2ConfigurableMap = new HashMap<>();
Map<String, IConfigurable> tempName2ConfigurableMap = new HashMap<>();
GetConfigureResult configures = loadConfigurable(namespace);
- // updateConfiguresCache(configures.getConfigure());
if (configures != null && configures.isQuerySuccess() && configures.getConfigurables() != null) {
- // List<Configure> configureList = filterConfigure(configures.getConfigure());
List<IConfigurable> configurables = configures.getConfigurables();
List<IConfigurable> configurableList = checkAndUpdateConfigurables(configurables, tempType2ConfigurableMap, tempName2ConfigurableMap);
- // this.namespace2ConfigurableMap = namespace2ConfigurableMap;
for (IConfigurable configurable : configurableList) {
if (configurable instanceof IAfterConfigurableRefreshListener) {
- ((IAfterConfigurableRefreshListener)configurable).doProcessAfterRefreshConfigurable(this);
+ ((IAfterConfigurableRefreshListener) configurable).doProcessAfterRefreshConfigurable(this);
}
}
return true;
@@ -149,10 +140,12 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
@Override
public <T> T queryConfigurable(String configurableType, String name) {
- return (T)queryConfigurableByIdent(configurableType, name);
+ return (T) queryConfigurableByIdent(configurableType, name);
}
- protected List<IConfigurable> checkAndUpdateConfigurables(List<IConfigurable> configurables, Map<String, List<IConfigurable>> tempType2ConfigurableMap, Map<String, IConfigurable> tempName2ConfigurableMap) {
+ protected List<IConfigurable> checkAndUpdateConfigurables(List<IConfigurable> configurables,
+ Map<String, List<IConfigurable>> tempType2ConfigurableMap,
+ Map<String, IConfigurable> tempName2ConfigurableMap) {
List<IConfigurable> configurableList = new ArrayList<>();
for (IConfigurable configurable : configurables) {
try {
@@ -185,7 +178,7 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
private void destroyOldConfigurable(IConfigurable oldConfigurable) {
if (AbstractConfigurable.class.isInstance(oldConfigurable)) {
- ((AbstractConfigurable)oldConfigurable).destroy();
+ ((AbstractConfigurable) oldConfigurable).destroy();
}
String key = getConfigureKey(oldConfigurable.getNameSpace(), oldConfigurable.getType(),
oldConfigurable.getConfigureName());
@@ -194,7 +187,7 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
protected void initConfigurable(IConfigurable configurable) {
if (AbstractConfigurable.class.isInstance(configurable)) {
- AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
+ AbstractConfigurable abstractConfigurable = (AbstractConfigurable) configurable;
abstractConfigurable.setConfigurableService(this);
}
@@ -202,9 +195,6 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
}
- /**
- * 内部使用
- */
private ScheduledExecutorService scheduledExecutorService;
@Override
@@ -232,10 +222,6 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
}, polingTime, polingTime, TimeUnit.SECONDS);
}
}
- // @Override
- // public List<IConfigurable> queryConfigurable(String nameSpace) {
- // return namespace2ConfigurableMap.get(nameSpace);
- // }
@Override
public List<IConfigurable> queryConfigurable(String type) {
@@ -260,7 +246,6 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
@Override
public void update(IConfigurable configurable) {
- // update(configurable,name2ConfigurableMap,type2ConfigurableMap);
updateConfigurable(configurable);
}
@@ -284,7 +269,6 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
IConfigurable oldConfigurable = this.name2ConfigurableMap.get(nameKey);
if (equals(configureKey, configurableList)) {
configurable = oldConfigurable;
- // name2ConfigurableMap.put(nameKey, name2ConfigurableMap.get(nameKey));
} else {
destroyOldConfigurable(oldConfigurable);
initConfigurable(configurable);
@@ -297,14 +281,12 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
updateConfiguresCache(configurable);
name2ConfigurableMap.put(nameKey, configurable);
String typeKey = MapKeyUtil.createKey(configurable.getType());
- // put2Map(namespace2ConfigurableMap, namespace, configurable);
put2Map(type2ConfigurableMap, typeKey, configurable);
return isUpdate;
}
@Override
public void insert(IConfigurable configurable) {
- // update(configurable,name2ConfigurableMap,type2ConfigurableMap);
insertConfigurable(configurable);
}
@@ -371,7 +353,6 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
jsonObject.put(CLASS_NAME, configurable.getClass().getName());
configure.setJsonValue(jsonObject.toJSONString());
}
- // configure.createIdentification();
return configure;
}
@@ -383,7 +364,7 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
}
Map<String, T> result = new HashMap<String, T>();
for (IConfigurable configurable : configurables) {
- result.put(configurable.getConfigureName(), (T)configurable);
+ result.put(configurable.getConfigureName(), (T) configurable);
}
return result;
}
@@ -423,7 +404,7 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
configurable.setNameSpace(namespace);
configurable.setType(type);
if (AbstractConfigurable.class.isInstance(configurable)) {
- AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
+ AbstractConfigurable abstractConfigurable = (AbstractConfigurable) configurable;
abstractConfigurable.setConfigurableService(this);
}
configurable.toObject(jsonValue);
@@ -450,7 +431,7 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
jsonString);
if (configurable instanceof Entity) {
// add by wangtl 20171110 Configurable接口第三方包也在用,故不能Configurable里加接口,只能加到抽象类里,这里强转下
- Entity abs = (Entity)configurable;
+ Entity abs = (Entity) configurable;
abs.setId(configure.getId());
abs.setGmtCreate(configure.getGmtCreate());
abs.setGmtModified(configure.getGmtModified());