You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/08/02 11:19:10 UTC
[rocketmq-streams] 08/27: add channel-configurable
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit 1992a3514e723f452b59878be8898d855eab2c7f
Author: vv <ze...@alibaba-inc.com>
AuthorDate: Mon Aug 2 12:04:58 2021 +0800
add channel-configurable
---
rocketmq-streams-configurable/pom.xml | 21 +
.../streams/configuable/ConfigurableComponent.java | 189 +++++++
.../streams/configuable/model/Configure.java | 130 +++++
.../service/AbstractConfigurableService.java | 561 +++++++++++++++++++++
.../AbstractSupportParentConfigureService.java | 263 ++++++++++
.../service/ConfigurableServcieType.java | 31 ++
.../service/ConfigurableServiceFactory.java | 58 +++
.../service/impl/FileConfigureService.java | 250 +++++++++
.../impl/FileSupportParentConfigureService.java | 38 ++
.../service/impl/MemoryConfigureService.java | 122 +++++
.../impl/MemorySupportParentConfigureService.java | 39 ++
.../configurable/ConfigurableComponent.java | 188 +++++++
.../streams/configurable/model/Configure.java | 127 +++++
.../service/AbstractConfigurableService.java | 553 ++++++++++++++++++++
.../AbstractSupportParentConfigureService.java | 255 ++++++++++
.../service/ConfigurableServcieType.java | 29 ++
.../service/ConfigurableServiceFactory.java | 59 +++
.../service/impl/FileConfigureService.java | 249 +++++++++
.../impl/FileSupportParentConfigureService.java | 37 ++
.../service/impl/MemoryConfigureService.java | 121 +++++
.../impl/MemorySupportParentConfigureService.java | 37 ++
.../src/main/resources/log4j.xml | 20 +
.../configuable/ConfiguableComponentTest.java | 112 ++++
.../rocketmq/streams/configuable/model/Person.java | 110 ++++
.../configurable/ConfigurableComponentTest.java | 108 ++++
.../streams/configurable/model/Person.java | 110 ++++
.../src/test/resources/log4j.xml | 20 +
27 files changed, 3837 insertions(+)
diff --git a/rocketmq-streams-configurable/pom.xml b/rocketmq-streams-configurable/pom.xml
new file mode 100755
index 0000000..3f837d8
--- /dev/null
+++ b/rocketmq-streams-configurable/pom.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="utf-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>rocketmq-streams-configurable</artifactId>
+ <name>ROCKETMQ STREAMS :: configurable</name>
+ <packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-serviceloader</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/ConfigurableComponent.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/ConfigurableComponent.java
new file mode 100644
index 0000000..fbb35ca
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/ConfigurableComponent.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.component.ConfigureDescriptor;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
+import org.apache.rocketmq.streams.configuable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.configuable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.configuable.service.ConfigurableServiceFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * 对Configurable对象,做统一的管理,统一查询,插入和更新。 insert/update 把configuabel对象写入存储,支持文件存储(file),内存存储(memory)和db存储(DB)。可以在配置通过这个ConfigureFileKey.CONNECT_TYPE key 配置 query 是基于内存的查询,对象定时load到内存,可以在属性文件通过这个ConfigureFileKey.POLLING_TIME key配置加载周期,单位是秒 新对象加载后生效,已经存在的对象只有updateFlag发生变化才会被替换
+ */
+public class ConfigurableComponent extends AbstractComponent<IConfigurableService>
+ implements IConfigurableService {
+
+ private static final Log LOG = LogFactory.getLog(ConfigurableComponent.class);
+
+ protected volatile IConfigurableService configureService = null;
+
+ protected transient String namespace;
+
+ public ConfigurableComponent() {
+ initConfigurableServiceDescriptor();
+ addConfigureDescriptor(
+ new ConfigureDescriptor(CONNECT_TYPE, false, ConfigurableServcieType.DEFAULT_SERVICE_NAME));
+ }
+
+ public static ConfigurableComponent getInstance(String namespace) {
+ return ComponentCreator.getComponent(namespace, ConfigurableComponent.class);
+ }
+
+ @Override
+ protected boolean initProperties(Properties properties) {
+ try {
+ if (configureService != null) {
+ return true;
+ }
+ this.configureService = ConfigurableServiceFactory.createConfigurableService(properties);
+ return true;
+ } catch (Exception e) {
+ LOG.error("ConfigurableComponent create error,properties= " + properties, e);
+ return false;
+ }
+
+ }
+
+ @Override
+ public boolean startComponent(String namespace) {
+ try {
+ this.namespace = namespace;
+ configureService.initConfigurables(namespace);
+ return true;
+ } catch (Exception e) {
+ LOG.error("ConfigurableComponent init error, namespace is " + namespace, e);
+ return false;
+ }
+
+ }
+
+ /**
+ * 启动测试模式,用内存数据库存储和加载configurable数据
+ */
+ public static void begineTestMode() {
+ System.setProperty(ConfigurableComponent.CONNECT_TYPE, ConfigurableServcieType.MEMORY_SERVICE_NAME);
+ }
+
+ /**
+ * 关闭测试模式,用配置文件中配置的属性加载configuable数据
+ */
+ public static void endTestMode() {
+ System.clearProperty(ConfigurableComponent.CONNECT_TYPE);
+ }
+
+ @Override
+ public boolean stop() {
+ return true;
+ }
+
+ @Override
+ public IConfigurableService getService() {
+ return configureService;
+ }
+
+ @Override
+ public void initConfigurables(String namespace) {
+ configureService.initConfigurables(namespace);
+ }
+
+ @Override
+ public boolean refreshConfigurable(String namespace) {
+ return configureService.refreshConfigurable(namespace);
+ }
+
+ public void mockConfigurable(String namespace) {
+ refreshConfigurable(namespace);
+
+ }
+
+ @Override
+ public List<IConfigurable> queryConfigurable(String type) {
+ return configureService.queryConfigurable(type);
+ }
+
+ @Override
+ public <T extends IConfigurable> List<T> queryConfigurableByType(String type) {
+ return configureService.queryConfigurableByType(type);
+ }
+
+ @Override
+ public IConfigurable queryConfigurableByIdent(String type, String name) {
+ return configureService.queryConfigurableByIdent(type, name);
+ }
+
+ @Override
+ public IConfigurable queryConfigurableByIdent(String identification) {
+ return configureService.queryConfigurableByIdent(identification);
+ }
+
+ @Override
+ public void insert(IConfigurable configurable) {
+ configureService.insert(configurable);
+ ConfigurableUtil.refreshMock(configurable);
+ }
+
+ @Override
+ public void update(IConfigurable configurable) {
+ configureService.update(configurable);
+ }
+
+ @Override
+ public <T> Map<String, T> queryConfigurableMapByType(String type) {
+ return configureService.queryConfigurableMapByType(type);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T queryConfigurable(String configurableType, String name) {
+ return (T)queryConfigurableByIdent(configurableType, name);
+ }
+
+ //protected void insertConfigurable(JSONObject message, IConfigurable configurable) {
+ // ConfigurableUtil.insertConfigurable(message, configurable, this.configureService);
+ //}
+
+ @Override
+ public String getNamespace() {
+ if (AbstractConfigurableService.class.isInstance(configureService)) {
+ return ((AbstractConfigurableService)configureService).getNamespace();
+ }
+ return namespace;
+ }
+
+ @Override
+ public Collection<IConfigurable> findAll() {
+ return configureService.findAll();
+ }
+
+ @Override
+ public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+ return configureService.loadConfigurableFromStorage(type);
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/model/Configure.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/model/Configure.java
new file mode 100644
index 0000000..121233d
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/model/Configure.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.model;
+
+import org.apache.rocketmq.streams.common.model.Entity;
+
+/**
+ *
+ * configuable如果存储在db,这个是db表的映射对象
+ */
+public class Configure extends Entity {
+
+ private static final long serialVersionUID = 5668017348345235669L;
+
+ private String nameSpace;
+ private String type;
+ private String name;
+ // private String identification;
+ private String jsonValue;
+ private String modifyTime;
+ private String remark;
+ private int openRange;
+
+
+ public static String createTableSQL(String tableName){
+ return "/******************************************/\n"
+ + "/* TableName = dipper_configure */\n"
+ + "/******************************************/\n"
+ + "CREATE TABLE IF NOT EXISTS `"+tableName+"` (\n"
+ + " `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',\n"
+ + " `gmt_create` datetime NOT NULL COMMENT '创建时间',\n"
+ + " `gmt_modified` datetime NOT NULL COMMENT '修改时间',\n"
+ + " `namespace` varchar(32) NOT NULL COMMENT '项目标识',\n"
+ + " `type` varchar(32) NOT NULL COMMENT '配置类型',\n"
+ + " `name` varchar(128) NOT NULL COMMENT '配置名称',\n"
+ + " `json_value` text NOT NULL COMMENT '配置内容',\n"
+ + " `status` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '1:正在使用 0:已失效',\n"
+ + " PRIMARY KEY (`id`),\n"
+ + " UNIQUE KEY `uk_namespace_type_name` (`namespace`,`type`,`name`),\n"
+ + " KEY `idx_namespace` (`namespace`)\n"
+ + ") ENGINE=InnoDB AUTO_INCREMENT=1814834 DEFAULT CHARSET=utf8 COMMENT='统一接入配置项'\n"
+ + ";";
+ }
+
+ public String getNameSpace() {
+ return nameSpace;
+ }
+
+ public void setNameSpace(String nameSpace) {
+ this.nameSpace = nameSpace;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ // public String getIdentification() {
+ // return identification;
+ // }
+
+ // public void createIdentification() {
+ // this.identification = MapKeyUtil.createKey(nameSpace, type, name);
+ // }
+
+ public String getJsonValue() {
+ return jsonValue;
+ }
+
+ public void setJsonValue(String jsonValue) {
+ this.jsonValue = jsonValue;
+ }
+
+ public String getModifyTime() {
+ return modifyTime;
+ }
+
+ public void setModifyTime(String modifyTime) {
+ this.modifyTime = modifyTime;
+ }
+
+ public String getRemark() {
+ return remark;
+ }
+
+ public void setRemark(String remark) {
+ this.remark = remark;
+ }
+
+ public int getOpenRange() {
+ return openRange;
+ }
+
+ public void setOpenRange(int openRange) {
+ this.openRange = openRange;
+ }
+
+ @Override
+ public String toString() {
+ return "Configure{" + "nameSpace='" + nameSpace + '\'' + ", type='" + type + '\'' + ", name='" + name + '\''
+ + ", jsonValue='" + jsonValue + '\'' + ", modifyTime='" + modifyTime + '\'' + ", remark='" + remark + '\''
+ + ", openRange=" + openRange + '}';
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractConfigurableService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractConfigurableService.java
new file mode 100644
index 0000000..9f55da8
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractConfigurableService.java
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service;
+
+import com.alibaba.fastjson.JSONObject;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.datatype.DataType;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
+import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.rocketmq.streams.configuable.model.Configure;
+
+public abstract class AbstractConfigurableService implements IConfigurableService {
+
+ private static final Log LOG = LogFactory.getLog(AbstractConfigurableService.class);
+
+ private static final String CLASS_NAME = IConfigurableService.CLASS_NAME;
+
+ protected Map<String, List<IConfigurable>> type2ConfigurableMap = new HashMap<>();
+
+ protected Map<String, IConfigurable> name2ConfigurableMap = new HashMap<>();
+
+ protected Map<String, IConfigurable> configurableMap = new HashMap<>();
+
+ protected Properties properties;
+
+ protected transient String namespace;
+
+ public AbstractConfigurableService(Properties properties) {
+ this.properties = properties;
+ }
+
+ public AbstractConfigurableService() {
+ }
+
+ @Override
+ public IConfigurable queryConfigurableByIdent(String identification) {
+ return name2ConfigurableMap.get(identification);
+ }
+
+ protected String getConfigureKey(String nameSpace, String type, String name) {
+ return MapKeyUtil.createKey(nameSpace, type, name);
+ }
+
+ protected void updateConfiguresCache(IConfigurable configurable) {
+ if (configurable == null) {
+ return;
+ }
+ configurable.toJson();
+ String key = getConfigureKey(configurable.getNameSpace(), configurable.getType(), configurable.getConfigureName());
+ configurableMap.put(key, configurable);
+ }
+
+ protected void updateConfiguresCache(List<IConfigurable> configureList) {
+ for (IConfigurable iConfigurable : configureList) {
+ updateConfiguresCache(iConfigurable);
+ }
+ }
+
+ protected boolean equals(String key, List<?> newConfigureList) {
+ for (Object o : newConfigureList) {
+ IConfigurable configure = (IConfigurable)o;
+ String tempKey = getConfigureKey(configure.getNameSpace(), configure.getType(), configure.getConfigureName());
+ if (key.equals(tempKey)) {
+ IConfigurable oldConfigure = configurableMap.get(key);
+ if (oldConfigure == null) {
+ continue;
+ }
+ return ConfigurableUtil.compare(oldConfigure, configure);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public <T extends IConfigurable> List<T> queryConfigurableByType(String type) {
+ List<IConfigurable> list = queryConfigurable(type);
+ if (list == null) {
+ return new ArrayList<T>();
+ }
+ List<T> result = new ArrayList<T>();
+ for (IConfigurable configurable : list) {
+ result.add((T)configurable);
+ }
+ return result;
+ }
+
+ @Override
+ public boolean refreshConfigurable(String namespace) {
+ //每次刷新,重新刷新配置文件
+ //if(ComponentCreator.propertiesPath!=null){
+ // ComponentCreator.setProperties(ComponentCreator.propertiesPath);
+ //}
+ this.namespace = namespace;
+ // Map<String, List<IConfigurable>> namespace2ConfigurableMap = new HashMap<>();
+ Map<String, List<IConfigurable>> tempType2ConfigurableMap = new HashMap<>();
+ Map<String, IConfigurable> tempName2ConfigurableMap = new HashMap<>();
+ GetConfigureResult configures = loadConfigurable(namespace);
+ // updateConfiguresCache(configures.getConfigure());
+ if (configures != null && configures.isQuerySuccess() && configures.getConfigurables() != null) {
+ // List<Configure> configureList = filterConfigure(configures.getConfigure());
+ List<IConfigurable> configurables = configures.getConfigurables();
+ List<IConfigurable> configurableList = checkAndUpdateConfigurables(namespace, configurables,
+ tempType2ConfigurableMap, tempName2ConfigurableMap,
+ configures.getConfigurables());
+ // this.namespace2ConfigurableMap = namespace2ConfigurableMap;
+ for (IConfigurable configurable : configurableList) {
+ if (configurable instanceof IAfterConfiguableRefreshListerner) {
+ ((IAfterConfiguableRefreshListerner)configurable).doProcessAfterRefreshConfigurable(this);
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public <T> T queryConfigurable(String configurableType, String name) {
+ return (T)queryConfigurableByIdent(configurableType, name);
+ }
+
+ protected List<IConfigurable> checkAndUpdateConfigurables(String namespace, List<IConfigurable> configurables,
+ Map<String, List<IConfigurable>> tempType2ConfigurableMap,
+ Map<String, IConfigurable> tempName2ConfigurableMap,
+ List configures) {
+ List<IConfigurable> configurableList = new ArrayList<>();
+ for (IConfigurable configurable : configurables) {
+ try {
+ boolean isUpdate = update(configurable, tempName2ConfigurableMap, tempType2ConfigurableMap);
+ if (isUpdate) {
+ configurableList.add(configurable);
+ }
+ } catch (Exception e) {
+ LOG.error("组件初始化异常:" + e.getMessage() + ",name=" + configurable.getConfigureName(), e);
+ }
+ }
+ destroyOldConfigurables(tempName2ConfigurableMap);
+ this.name2ConfigurableMap = tempName2ConfigurableMap;
+ this.type2ConfigurableMap = tempType2ConfigurableMap;
+ return configurableList;
+ }
+
+ private void destroyOldConfigurables(Map<String, IConfigurable> tempName2ConfigurableMap) {
+ Iterator<Map.Entry<String, IConfigurable>> it = this.name2ConfigurableMap.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, IConfigurable> entry = it.next();
+ String key = entry.getKey();
+ IConfigurable value = entry.getValue();
+ if (!tempName2ConfigurableMap.containsKey(key)) {
+ destroyOldConfigurable(value);
+ }
+ }
+
+ }
+
+ private void destroyOldConfigurable(IConfigurable oldConfigurable) {
+ if (AbstractConfigurable.class.isInstance(oldConfigurable)) {
+ ((AbstractConfigurable)oldConfigurable).destroy();
+ }
+ String key = getConfigureKey(oldConfigurable.getNameSpace(), oldConfigurable.getType(),
+ oldConfigurable.getConfigureName());
+ configurableMap.remove(key);
+ }
+
+ protected void initConfigurable(IConfigurable configurable) {
+ if (AbstractConfigurable.class.isInstance(configurable)) {
+ AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
+ abstractConfigurable.setConfigurableService(this);
+ }
+
+ configurable.init();
+
+ }
+
+ /**
+ * 内部使用
+ */
+ private ScheduledExecutorService scheduledExecutorService;
+
+ @Override
+ public void initConfigurables(final String namespace) {
+ refreshConfigurable(namespace);
+ long polingTime = -1;
+ if (this.properties != null) {
+ String pollingTimeStr = this.properties.getProperty(AbstractComponent.POLLING_TIME);
+ if (StringUtil.isNotEmpty(pollingTimeStr)) {
+ polingTime = Long.valueOf(pollingTimeStr);
+ }
+ }
+ if (polingTime > 0) {
+ scheduledExecutorService = new ScheduledThreadPoolExecutor(3);
+ scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ refreshConfigurable(namespace);
+ } catch (Exception e) {
+ LOG.error("Load configurables error:" + e.getMessage(), e);
+ }
+ }
+ }, polingTime, polingTime, TimeUnit.SECONDS);
+ }
+ }
+ // @Override
+ // public List<IConfigurable> queryConfigurable(String nameSpace) {
+ // return namespace2ConfigurableMap.get(nameSpace);
+ // }
+
+ @Override
+ public List<IConfigurable> queryConfigurable(String type) {
+ String key = MapKeyUtil.createKey(type);
+ return type2ConfigurableMap.get(key);
+ }
+
+ @Override
+ public IConfigurable queryConfigurableByIdent(String type, String name) {
+ String key = MapKeyUtil.createKey(type, name);
+ return name2ConfigurableMap.get(key);
+ }
+
+ /**
+ * 根据namespace加载配置信息
+ *
+ * @param namespace
+ * @return
+ * @throws Exception
+ */
+ protected abstract GetConfigureResult loadConfigurable(String namespace);
+
+ @Override
+ public void update(IConfigurable configurable) {
+ // update(configurable,name2ConfigurableMap,type2ConfigurableMap);
+ updateConfigurable(configurable);
+ }
+
+ protected abstract void updateConfigurable(IConfigurable configurable);
+
+ protected abstract void insertConfigurable(IConfigurable configurable);
+
+ protected boolean update(IConfigurable configurable, Map<String, IConfigurable> name2ConfigurableMap,
+ Map<String, List<IConfigurable>> type2ConfigurableMap) {
+ if (configurable == null) {
+ return false;
+ }
+
+ boolean isUpdate = false;
+ List<IConfigurable> configurableList = new ArrayList<>();
+ configurableList.add(configurable);
+
+ String nameKey = MapKeyUtil.createKey(configurable.getType(), configurable.getConfigureName());
+ if (this.name2ConfigurableMap.containsKey(nameKey)) {
+ String configureKey = getConfigureKey(namespace, configurable.getType(), configurable.getConfigureName());
+ IConfigurable oldConfigurable = this.name2ConfigurableMap.get(nameKey);
+ if (equals(configureKey, configurableList)) {
+ configurable = oldConfigurable;
+ // name2ConfigurableMap.put(nameKey, name2ConfigurableMap.get(nameKey));
+ } else {
+ destroyOldConfigurable(oldConfigurable);
+ initConfigurable(configurable);
+ isUpdate = true;
+ }
+ } else {
+ initConfigurable(configurable);
+ isUpdate = true;
+ }
+ updateConfiguresCache(configurable);
+ name2ConfigurableMap.put(nameKey, configurable);
+ String typeKey = MapKeyUtil.createKey(configurable.getType());
+ // put2Map(namespace2ConfigurableMap, namespace, configurable);
+ put2Map(type2ConfigurableMap, typeKey, configurable);
+ return isUpdate;
+ }
+
+ @Override
+ public void insert(IConfigurable configurable) {
+ // update(configurable,name2ConfigurableMap,type2ConfigurableMap);
+ insertConfigurable(configurable);
+ }
+
+ /**
+ * 给一个扣,可以跨命名空间查询数据
+ *
+ * @param namespaces
+ * @return
+ */
+ public List<IConfigurable> queryConfiguableByNamespace(String... namespaces) {
+ List<IConfigurable> configurables = new ArrayList<>();
+ if (namespaces == null || namespaces.length == 0) {
+ return configurables;
+ }
+ for (String namespace : namespaces) {
+ GetConfigureResult result = loadConfigurable(namespace);
+ if (result.querySuccess) {
+ if (result.configurables != null && result.configurables.size() > 0) {
+ configurables.addAll(result.configurables);
+ }
+ } else {
+ throw new RuntimeException("Load configurable error, the namespace is " + namespace);
+ }
+ }
+ return configurables;
+
+ }
+
+ /**
+ * 往一个value是list的map中添加数据,如果list是空创建,否则直接插入
+ *
+ * @param map
+ * @param key
+ * @param configurable
+ */
+ protected void put2Map(Map<String, List<IConfigurable>> map, String key, IConfigurable configurable) {
+ List<IConfigurable> list = map.computeIfAbsent(key, k -> new ArrayList<IConfigurable>());
+ list.add(configurable);
+ }
+
+ @Override
+ public Collection<IConfigurable> findAll() {
+ return name2ConfigurableMap.values();
+ }
+
+ /**
+ * 把configurable转换成configure
+ *
+ * @param configurable
+ * @return
+ */
+ protected Configure createConfigure(IConfigurable configurable) {
+ Configure configure = new Configure();
+ configure.setType(configurable.getType());
+ configure.setName(configurable.getConfigureName());
+ configure.setNameSpace(configurable.getNameSpace());
+ String jsonString = configurable.toJson();
+ if (!StringUtil.isEmpty(jsonString)) {
+ JSONObject jsonObject = JSONObject.parseObject(jsonString);
+ jsonObject.put(CLASS_NAME, configurable.getClass().getName());
+ configure.setJsonValue(jsonObject.toJSONString());
+ }
+ // configure.createIdentification();
+ return configure;
+ }
+
+ @Override
+ public <T> Map<String, T> queryConfigurableMapByType(String type) {
+ List<IConfigurable> configurables = queryConfigurable(type);
+ if (configurables == null) {
+ return new HashMap<String, T>();
+ }
+ Map<String, T> result = new HashMap<String, T>();
+ for (IConfigurable configurable : configurables) {
+ result.put(configurable.getConfigureName(), (T)configurable);
+ }
+ return result;
+ }
+
+ /**
+ * 把configure转换成configurable
+ *
+ * @param configures
+ * @return
+ */
+ protected List<IConfigurable> convert(List<Configure> configures) {
+ if (configures == null) {
+ return new ArrayList<IConfigurable>();
+ }
+ List<IConfigurable> configurables = new ArrayList<IConfigurable>();
+ for (Configure configure : configures) {
+ IConfigurable configurable = convert(configure);
+ if (configurable != null) {
+ configurables.add(configurable);
+ }
+
+ }
+ return configurables;
+ }
+
+ protected IConfigurable createConfigurableFromJson(String namespace, String type, String name, String jsonValue) {
+ if (StringUtil.isEmpty(jsonValue)) {
+ return null;
+ }
+ JSONObject jsonObject = JSONObject.parseObject(jsonValue);
+ String className = jsonObject.getString(CLASS_NAME);
+ IConfigurable configurable = createConfigurable(className);
+ if (configurable == null) {
+ return null;
+ }
+ configurable.setConfigureName(name);
+ configurable.setNameSpace(namespace);
+ configurable.setType(type);
+ if (AbstractConfigurable.class.isInstance(configurable)) {
+ AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
+ abstractConfigurable.setConfigurableService(this);
+ }
+ configurable.toObject(jsonValue);
+ return configurable;
+ }
+
+ /**
+ * 提供一个入口,可以让外部用户改变configure对应的configurable的值
+ *
+ * @param configure
+ * @return
+ */
+ protected IConfigurable convert(Configure configure) {
+
+ return convertConfigurable(configure);
+ }
+
+ protected IConfigurable convertConfigurable(Configure configure) {
+ String className = null;
+ try {
+ String jsonString = configure.getJsonValue();
+ IConfigurable configurable =
+ createConfigurableFromJson(configure.getNameSpace(), configure.getType(), configure.getName(),
+ jsonString);
+ if (configurable instanceof Entity) {
+ // add by wangtl 20171110 Configurable接口第三方包也在用,故不能Configurable里加接口,只能加到抽象类里,这里强转下
+ Entity abs = (Entity)configurable;
+ abs.setId(configure.getId());
+ abs.setGmtCreate(configure.getGmtCreate());
+ abs.setGmtModified(configure.getGmtModified());
+ /*
+ * abs.setTempKey((configurable.getNameSpace() + configurable.getType() +
+ * configurable.getConfigureName() + jsonString).hashCode());
+ */
+ }
+ convertPost(configurable);
+ return configurable;
+ } catch (Exception e) {
+ LOG.error("转换异常:" + configure.toString(), e);
+ return null;
+ }
+ }
+
+ /**
+ * 如果需要改变configurable的属性,可以再这里设置
+ *
+ * @param configurable
+ */
+ @SuppressWarnings("rawtypes")
+ protected void convertPost(IConfigurable configurable) {
+ if (this.properties == null) {
+ return;
+ }
+ String identification =
+ MapKeyUtil.createKey(configurable.getNameSpace(), configurable.getType(), configurable.getConfigureName());
+ String propertyValue = this.properties.getProperty(identification);
+ if (StringUtil.isEmpty(propertyValue)) {
+ return;
+ }
+ String[] fieldName2Values = propertyValue.split(",");
+ if (fieldName2Values.length == 0) {
+ return;
+ }
+ for (String fieldName2Value : fieldName2Values) {
+ try {
+ String[] fieldName2ValueArray = fieldName2Value.split(":");
+ if (fieldName2ValueArray.length != 2) {
+ continue;
+ }
+ String fieldName = fieldName2ValueArray[0];
+ String value = fieldName2ValueArray[1];
+ Class clazz = ReflectUtil.getBeanFieldType(configurable.getClass(), fieldName);
+ DataType dataType = DataTypeUtil.createDataType(clazz, null);
+ if (dataType == null) {
+ continue;
+ }
+ Object fieldValue = dataType.getData(value);
+ ReflectUtil.setBeanFieldValue(configurable, fieldName, fieldValue);
+
+ } catch (Exception e) {
+ LOG.error("convert post error " + fieldName2Value, e);
+ continue;
+ }
+
+ }
+ }
+
+ /**
+ * 创建configurable对象
+ *
+ * @param className class name
+ * @return
+ */
+ @SuppressWarnings("rawtypes")
+ protected IConfigurable createConfigurable(String className) {
+ return ReflectUtil.forInstance(className);
+ }
+
+ public class GetConfigureResult {
+
+ private boolean querySuccess;
+ private List<IConfigurable> configurables;
+
+ public boolean isQuerySuccess() {
+ return querySuccess;
+ }
+
+ public void setQuerySuccess(boolean querySuccess) {
+ this.querySuccess = querySuccess;
+ }
+
+ public List<IConfigurable> getConfigurables() {
+ return configurables;
+ }
+
+ public void setConfigurables(List<IConfigurable> configurables) {
+ this.configurables = configurables;
+ }
+ }
+
+ @Override
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractSupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractSupportParentConfigureService.java
new file mode 100644
index 0000000..390a119
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractSupportParentConfigureService.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableListerner;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * namespace 分层,支持顶级命名空间,顶级命名空间的对象,所有命名空间都可见。顶级命名空间是固定值IConfigurableService.PARENT_CHANNEL_NAME_SPACE
+ */
+public abstract class AbstractSupportParentConfigureService extends AbstractConfigurableService
+ implements IConfigurableService {
+
+ private static final Log LOG = LogFactory.getLog(AbstractSupportParentConfigureService.class);
+ protected IConfigurableService configureService = null;
+ protected IConfigurableService parentConfigureService = null;
+ //protected IConfigurableService shareConfigureService = null;
+ protected Properties properties;
+
+ public AbstractSupportParentConfigureService() {
+ super(null);
+ }
+
+ public void initMethod(Properties property) {
+ this.properties = property;
+ initBeforeInitConfigurable(property);
+ }
+
+ protected abstract void initBeforeInitConfigurable(Properties property);
+
+
+
+
+ @Override
+ public void initConfigurables(String namespace) {
+
+ if (!IConfigurableService.PARENT_CHANNEL_NAME_SPACE.equals(namespace)) {
+ parentConfigureService.initConfigurables(IConfigurableService.PARENT_CHANNEL_NAME_SPACE);
+ } else {
+ parentConfigureService = null;
+ }
+ configureService.initConfigurables(namespace);
+ }
+
+ @Override
+ public boolean refreshConfigurable(String namespace) {
+
+ if (!IConfigurableService.PARENT_CHANNEL_NAME_SPACE.equals(namespace)) {
+ parentConfigureService.refreshConfigurable(IConfigurableService.PARENT_CHANNEL_NAME_SPACE);
+ // initShareConfigurableService(namespace);
+ }
+ configureService.refreshConfigurable(namespace);
+ return true;
+ }
+
+
+
+ @Override
+ public List<IConfigurable> queryConfigurable(String type) {
+ List<IConfigurable> result = configureService.queryConfigurable(type);
+ if (result == null) {
+ result = new ArrayList<>();
+ }
+ //if (shareConfigureService != null) {
+ // List<IConfigurable> share = shareConfigureService.queryConfigurable(type);
+ // if (share != null) {
+ // result.addAll(share);
+ // }
+ //}
+ if (parentConfigureService == null) {
+ return result;
+ }
+ List<IConfigurable> parent = parentConfigureService.queryConfigurable(type);
+ if (parent != null) {
+ result.addAll(parent);
+ }
+ return result;
+ }
+
+ @Override
+ public IConfigurable queryConfigurableByIdent(String type, String name) {
+ IConfigurable configurable = configureService.queryConfigurableByIdent(type, name);
+ if (configurable != null) {
+ return configurable;
+ }
+ if (parentConfigureService == null) {
+ return null;
+ }
+ //if (shareConfigureService != null) {
+ // configurable = shareConfigureService.queryConfigurableByIdent(type, name);
+ //}
+ if (configurable != null) {
+ return configurable;
+ }
+ return parentConfigureService.queryConfigurableByIdent(type, name);
+ }
+
+ @Override
+ public IConfigurable queryConfigurableByIdent(String identification) {
+ IConfigurable configurable = configureService.queryConfigurableByIdent(identification);
+ if (configurable != null) {
+ return configurable;
+ }
+ if (parentConfigureService == null) {
+ return null;
+ }
+ //if (shareConfigureService != null) {
+ // configurable = shareConfigureService.queryConfigurableByIdent(identification);
+ //}
+ if (configurable != null) {
+ return configurable;
+ }
+ return parentConfigureService.queryConfigurableByIdent(identification);
+ }
+
+ @Override
+ protected void insertConfigurable(IConfigurable configurable) {
+ if (parentConfigureService != null && configurable.getNameSpace()
+ .equals(IConfigurableService.PARENT_CHANNEL_NAME_SPACE)) {
+ parentConfigureService.insert(configurable);
+ } else {
+ configureService.insert(configurable);
+ }
+ }
+
+ @Override
+ protected void updateConfigurable(IConfigurable configurable) {
+ if (parentConfigureService != null && configurable.getNameSpace()
+ .equals(IConfigurableService.PARENT_CHANNEL_NAME_SPACE)) {
+ parentConfigureService.update(configurable);
+ } else {
+ configureService.update(configurable);
+ }
+ }
+
+ @Override
+ public <T> T queryConfigurable(String configurableType, String name) {
+ return (T) queryConfigurableByIdent(configurableType, name);
+ }
+
+ @Override
+ protected GetConfigureResult loadConfigurable(String namespace) {
+ return null;
+ }
+
+ //protected void initShareConfigurableService(String namespace) {
+ // if (parentConfigureService == null) {
+ // return;
+ // }
+ // shareConfigureService = new AbstractReadOnlyConfigurableService() {
+ //
+ // @Override
+ // public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+ // refreshConfigurable(namespace);
+ // return queryConfigurableByType(type);
+ // }
+ //
+ // @Override
+ // protected List<IConfigurable> loadConfigurables(String namespace) {
+ // List<IConfigurable> parent = parentConfigureService.queryConfigurable(ShareConfiguable.TYPE);
+ // List<IConfigurable> shareConfigurables = new ArrayList<>();
+ // if (parent == null) {
+ // return shareConfigurables;
+ // }
+ // for (IConfigurable configurable : parent) {
+ // ShareConfiguable shareConfiguable = (ShareConfiguable) configurable;
+ // if (shareConfiguable.getShareAll() || shareConfiguable.getShareNameSpaces().contains(namespace)) {
+ // String sharedNameSpace = shareConfiguable.getSharedNameSpace();
+ // String sharedType = shareConfiguable.getSharedType();
+ // String sharedName = shareConfiguable.getSharedName();
+ // List<IConfigurable> sharedConfigrables =
+ // createAndQueryConfigurable(sharedNameSpace, sharedType, sharedName);
+ // if (sharedConfigrables != null) {
+ // shareConfigurables.addAll(sharedConfigrables);
+ // }
+ // }
+ // }
+ // return shareConfigurables;
+ // }
+ //
+ //
+ // };
+ // shareConfigureService.refreshConfigurable(namespace);
+ //
+ //}
+
+ protected List<IConfigurable> createAndQueryConfigurable(String sharedNameSpace, String sharedType,
+ String sharedName) {
+ IConfigurableService innerSharedConfigurableService =
+ ConfigurableServiceFactory.createConfigurableService(properties);
+ innerSharedConfigurableService.refreshConfigurable(sharedNameSpace);
+ if (StringUtil.isNotEmpty(sharedName)) {
+ List<IConfigurable> configurables = new ArrayList<>();
+ IConfigurable configurable = innerSharedConfigurableService.queryConfigurableByIdent(sharedType, sharedName);
+ configurables.add(configurable);
+ return configurables;
+ } else {
+ return innerSharedConfigurableService.queryConfigurable(sharedType);
+ }
+
+ }
+
+
+ @Override
+ public Collection<IConfigurable> findAll() {
+ List<IConfigurable> configurables=new ArrayList<>();
+ if (parentConfigureService != null ) {
+ Collection<IConfigurable> tmp=parentConfigureService.findAll();
+ if(tmp!=null||tmp.size()>0){
+ configurables.addAll(tmp);
+ }
+ }
+ Collection<IConfigurable> tmp=configureService.findAll();
+ if(tmp!=null||tmp.size()>0){
+ configurables.addAll(tmp);
+ }
+ return configurables;
+ }
+
+ public IConfigurableService getConfigureService() {
+ return configureService;
+ }
+
+ @Override
+ public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+ List<T> configurables=new ArrayList<>();
+ if (parentConfigureService != null ) {
+ Collection<T> tmp=parentConfigureService.loadConfigurableFromStorage(type);
+ if(tmp!=null||tmp.size()>0){
+ configurables.addAll(tmp);
+ }
+ }
+ Collection<T> tmp=configureService.loadConfigurableFromStorage(type);
+ if(tmp!=null||tmp.size()>0){
+ configurables.addAll(tmp);
+ }
+ return configurables;
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/ConfigurableServcieType.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/ConfigurableServcieType.java
new file mode 100644
index 0000000..f2b72ab
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/ConfigurableServcieType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+
+@Deprecated
+public class ConfigurableServcieType {
+
+ public static final String DEFAULT_SERVICE_NAME = IConfigurableService.DEFAULT_SERVICE_NAME;
+ public static final String MEMORY_SERVICE_NAME = IConfigurableService.MEMORY_SERVICE_NAME;
+ public static final String FILE_SERVICE_NAME = IConfigurableService.FILE_SERVICE_NAME;
+ public static final String HTTP_SERVICE_NAME = IConfigurableService.HTTP_SERVICE_NAME;
+
+
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/ConfigurableServiceFactory.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/ConfigurableServiceFactory.java
new file mode 100644
index 0000000..307dcae
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/ConfigurableServiceFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;
+
+public class ConfigurableServiceFactory {
+ private static final ServiceLoaderComponent<IConfigurableService> configurableServiceLoaderComponent = ServiceLoaderComponent.getInstance(IConfigurableService.class);
+ private static final Log LOG = LogFactory.getLog(ConfigurableServiceFactory.class);
+
+ public static IConfigurableService createConfigurableService(Properties properties) {
+ try {
+ Properties properties1 = new Properties();
+ properties1.putAll(properties);
+ String type = properties1.getProperty(ConfigureFileKey.CONNECT_TYPE);
+ if (StringUtil.isEmpty(type)) {
+ type = IConfigurableService.DEFAULT_SERVICE_NAME;
+ }
+ IConfigurableService configurableService = getConfigurableServiceType(type);
+ if (configurableService instanceof AbstractSupportParentConfigureService) {
+ ((AbstractSupportParentConfigureService)configurableService).initMethod(properties1);
+ }
+ return configurableService;
+ } catch (Exception e) {
+ LOG.error("create ConfigurableService error", e);
+ return null;
+ }
+
+ }
+
+ public static IConfigurableService getConfigurableServiceType(String type) {
+ IConfigurableService configurableService = configurableServiceLoaderComponent.getService().loadService(type);
+ return ReflectUtil.forInstance(configurableService.getClass().getName());
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/FileConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/FileConfigureService.java
new file mode 100644
index 0000000..d251bda
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/FileConfigureService.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service.impl;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configuable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.configuable.model.Configure;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.common.utils.DateUtil;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class FileConfigureService extends AbstractConfigurableService {
+
+ public static final String FILE_PATH_NAME = IConfigurableService.FILE_PATH_NAME;
+ // 配置文件的路径
+ private static final Log LOG = LogFactory.getLog(FileConfigureService.class);
+ private static final String DEFAULT_FILE_NAME = "dipper_configure.cs"; // 默认文件名
+ private static final String SIGN = "&&&&"; // 字段分割附号
+ public String fileName;
+
+ public FileConfigureService(Properties properties) {
+ super(properties);
+ initService(properties.getProperty(FILE_PATH_NAME));
+ }
+
+ protected void initService(String fileAndPath) {
+ if (StringUtil.isEmpty(fileAndPath)) {
+ String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+ if (path.endsWith(".jar")) {
+ int index = path.lastIndexOf(File.separator);
+ path = path.substring(0, index);
+ }
+ fileName = FileUtil.concatFilePath(path, DEFAULT_FILE_NAME);
+ } else {
+ fileName = fileAndPath;
+ }
+ LOG.info("load file from path = " + fileName);
+ }
+
+ @Override
+ protected GetConfigureResult loadConfigurable(String namespace) {
+ GetConfigureResult result = new GetConfigureResult();
+ try {
+ List<Configure> configures = selectOpening(namespace);
+ List<IConfigurable> configurables = convert(configures);
+ LOG.info("load configure namespace=" + namespace + " count=" + configures.size());
+ result.setConfigurables(configurables);
+ result.setQuerySuccess(true);// 该字段标示查询是否成功,若不成功则不会更新配置
+ } catch (Exception e) {
+ result.setQuerySuccess(false);
+ e.printStackTrace();
+ LOG.error("load configurable error ", e);
+ }
+ return result;
+ }
+
+ protected List<Configure> selectOpening(String namespace) {
+ List<String> list = loadFileLine(fileName);
+ List<Configure> configures = convert2Configure(list);
+ return filter(configures, namespace);
+ }
+
+ protected List<Configure> filter(List<Configure> configures, String namespace) {
+ if (configures == null) {
+ return new ArrayList<>();
+ }
+ if (StringUtil.isEmpty(namespace)) {
+ throw new RuntimeException("namespace can not empty ");
+ }
+ List<Configure> filterConfigures = new ArrayList<>();
+ for (Configure configure : configures) {
+ if (!namespace.equals(configure.getNameSpace())) {
+ continue;
+ }
+ filterConfigures.add(configure);
+ }
+ return filterConfigures;
+ }
+
+ @Override
+ protected void insertConfigurable(IConfigurable configure) {
+ if (configure == null) {
+ LOG.warn("insert configure is null");
+ return;
+ }
+ String row = configure2String(configure);
+
+ List<String> rows = loadFileLine(fileName);
+ if (rows == null) {
+ rows = new ArrayList<>();
+ }
+ List<Configure> configures = convert2Configure(rows);
+ String newKey =
+ MapKeyUtil.createKey(configure.getNameSpace(), configure.getType(), configure.getConfigureName());
+ boolean isReplace = false;
+ for (int i = 0; i < configures.size(); i++) {
+ Configure c = configures.get(i);
+ String old = MapKeyUtil.createKey(c.getNameSpace(), c.getType(), c.getName());
+ if (old.equals(newKey)) {
+ rows.set(i, configure2String(configure));
+ isReplace = true;
+ break;
+ }
+ }
+ if (!isReplace) {
+ rows.add(configure2String(configure));
+ }
+ writeFile(fileName, rows);
+ }
+
+ @Override
+ protected void updateConfigurable(IConfigurable configure) {
+ if (configure == null) {
+ LOG.warn("insert configure is null");
+ return;
+ }
+
+ List<String> rows = FileUtil.loadFileLine(fileName);
+ if (rows == null) {
+ rows = new ArrayList<>();
+ }
+ for (int i = 0; i < rows.size(); i++) {
+ String row = rows.get(i);
+ Configure oldConfigure = convert(row);
+ if (configure.getNameSpace().equals(oldConfigure.getNameSpace()) && configure.getType()
+ .equals(oldConfigure.getType()) && configure.getConfigureName().equals(oldConfigure.getName())) {
+ rows.set(i, configure2String(configure));
+ }
+ }
+ writeFile(fileName, rows);
+
+ }
+
+ protected Configure convert(String row) {
+ String[] values = row.split(SIGN);
+ String namespace = getColumnValue(values, 0, "namespace");
+ String type = getColumnValue(values, 1, "type");
+ String name = getColumnValue(values, 2, "name");
+ String jsonValue = getColumnValue(values, 3, "json_value");
+ String createDate = getColumnValue(values, 4, "gmt_create");
+ String modifiedDate = getColumnValue(values, 5, "gmt_modified");
+ String id = getColumnValue(values, 6, "id");
+ Configure configure = new Configure();
+ configure.setNameSpace(namespace);
+ configure.setType(type);
+ configure.setName(name);
+ configure.setJsonValue(jsonValue);
+ configure.setGmtCreate(DateUtil.parse(createDate));
+ configure.setGmtCreate(DateUtil.parse(modifiedDate));
+ configure.setId((id == null ? null : Long.valueOf(id)));
+
+ return configure;
+ }
+
+ protected List<Configure> convert2Configure(List<String> rows) {
+ List<Configure> configures = new ArrayList<Configure>();
+ for (String row : rows) {
+ configures.add(convert(row));
+ }
+ return configures;
+ }
+
+ protected String getColumnValue(String[] values, int i, String namespace) {
+ if (values == null || values.length == 0) {
+ return null;
+ }
+ if (values.length <= i) {
+ return null;
+ }
+ if ("null".equals(values[i])) {
+ return null;
+ }
+ return values[i];
+ }
+
+ /**
+ * 解密文件,并加载到内存
+ *
+ * @param fileName
+ * @return
+ */
+ protected List<String> loadFileLine(String fileName) {
+ List<String> rows = FileUtil.loadFileLine(fileName);
+ if (rows == null) {
+ rows = new ArrayList<>();
+ }
+ return doDecRowList(rows);
+ }
+
+ protected void writeFile(String fileName, List<String> rows) {
+ List<String> rowList = doEncryptRowList(rows);
+ FileUtil.write(fileName, rowList);
+ }
+
+ private List<String> doEncryptRowList(List<String> rows) {
+ return rows;
+ }
+
+ private List<String> doDecRowList(List<String> rows) {
+ return rows;
+ }
+
+ protected String configure2String(IConfigurable configure) {
+ Entity entity = null;
+ if (configure instanceof Entity) {
+ entity = (Entity)configure;
+ } else {
+ entity = new Entity();
+ }
+ String row = MapKeyUtil.createKeyBySign(SIGN, configure.getNameSpace(), configure.getType(),
+ configure.getConfigureName(), configure.toJson(), DateUtil.format(entity.getGmtCreate()),
+ DateUtil.format(entity.getGmtModified()), entity.getId() + "");
+ return row;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ @Override
+ public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+ refreshConfigurable(getNamespace());
+ return queryConfigurableByType(type);
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/FileSupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/FileSupportParentConfigureService.java
new file mode 100644
index 0000000..bf8d151
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/FileSupportParentConfigureService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service.impl;
+
+import org.apache.rocketmq.streams.configuable.service.AbstractSupportParentConfigureService;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configuable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import com.google.auto.service.AutoService;
+
+import java.util.Properties;
+
+@AutoService(IConfigurableService.class)
+@ServiceName(ConfigurableServcieType.FILE_SERVICE_NAME)
+public class FileSupportParentConfigureService extends AbstractSupportParentConfigureService {
+
+ @Override
+ protected void initBeforeInitConfigurable(Properties property) {
+ this.configureService = new FileConfigureService(properties);
+ this.parentConfigureService = new FileConfigureService(properties);
+ }
+
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/MemoryConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/MemoryConfigureService.java
new file mode 100644
index 0000000..e846e44
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/MemoryConfigureService.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.configuable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+
+public class MemoryConfigureService extends AbstractConfigurableService {
+
+ private static Map<String, List<IConfigurable>> namespace2Configure = new HashMap<>();
+
+ public MemoryConfigureService(Properties properties) {
+ super(properties);
+ }
+
+ @Override
+ protected GetConfigureResult loadConfigurable(String namespace) {
+ GetConfigureResult result = new GetConfigureResult();
+ result.setQuerySuccess(true);
+ List<IConfigurable> configurableList = new ArrayList<>();
+ List<IConfigurable> configurables = namespace2Configure.get(namespace);
+ if (configurables == null) {
+ configurableList = null;
+ } else {
+ List<IConfigurable> tmps = new ArrayList<>();
+ tmps.addAll(configurables);
+ for (IConfigurable configurable : tmps) {
+ IConfigurable tmp = ReflectUtil.forInstance(configurable.getClass());
+ tmp.toObject(configurable.toJson());
+ tmp.setNameSpace(configurable.getNameSpace());
+ tmp.setConfigureName(configurable.getConfigureName());
+ configurableList.add(tmp);
+ }
+ }
+ result.setConfigurables(configurableList);
+ return result;
+ }
+
+ @Override
+ protected void insertConfigurable(IConfigurable configurable) {
+ if (configurable == null) {
+ return;
+ }
+
+ String namespace = configurable.getNameSpace();
+ List<IConfigurable> list = namespace2Configure.get(namespace);
+ if (list == null) {
+ synchronized (this) {
+ list = namespace2Configure.get(namespace);
+ if (list == null) {
+ list = new ArrayList<>();
+ namespace2Configure.put(namespace, list);
+ }
+ }
+ }
+ int removeIndex = -1;
+ for (int i = 0; i < list.size(); i++) {
+ IConfigurable config = list.get(i);
+ if (config.getType().equals(configurable.getType()) && config.getConfigureName()
+ .equals(configurable.getConfigureName())) {
+ removeIndex = i;
+ }
+ }
+ if (AbstractConfigurable.class.isInstance(configurable)) {
+ ((AbstractConfigurable)configurable).setConfigurableService(this);
+ }
+ if (removeIndex != -1) {
+ list.remove(removeIndex);
+ }
+ list.add(configurable);
+ }
+
+ @Override
+ protected void updateConfigurable(IConfigurable configure) {
+ List<IConfigurable> list = namespace2Configure.get(configure.getNameSpace());
+ if (list == null || list.size() == 0) {
+ throw new RuntimeException(
+ "not have exist configure " + configure.getNameSpace() + "," + configure.getType() + ","
+ + configure.getConfigureName());
+ }
+ for (int i = 0; i < list.size(); i++) {
+ IConfigurable config = list.get(i);
+ if (config.getType().equals(configure.getType()) && config.getConfigureName()
+ .equals(configure.getConfigureName())) {
+ list.set(i, configure);
+ return;
+ }
+ }
+ throw new RuntimeException(
+ "not have exist configure " + configure.getNameSpace() + "," + configure.getType() + ","
+ + configure.getConfigureName());
+ }
+
+ @Override
+ public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+ refreshConfigurable(getNamespace());
+ return queryConfigurableByType(type);
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/MemorySupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/MemorySupportParentConfigureService.java
new file mode 100644
index 0000000..528d087
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/MemorySupportParentConfigureService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service.impl;
+
+import java.util.Properties;
+
+import org.apache.rocketmq.streams.configuable.service.AbstractSupportParentConfigureService;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configuable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import com.google.auto.service.AutoService;
+
+
+@AutoService(IConfigurableService.class)
+@ServiceName(ConfigurableServcieType.MEMORY_SERVICE_NAME)
+public class MemorySupportParentConfigureService extends AbstractSupportParentConfigureService {
+
+ @Override
+ protected void initBeforeInitConfigurable(Properties property) {
+ // this.rootConfigureService = new MemoryConfigureService(property);
+ this.parentConfigureService = new MemoryConfigureService(properties);
+ this.configureService = new MemoryConfigureService(properties);
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
new file mode 100644
index 0000000..91b3171
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.component.ConfigureDescriptor;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configurable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.configurable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.configurable.service.ConfigurableServiceFactory;
+import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * 对Configurable对象,做统一的管理,统一查询,插入和更新。 insert/update 把configuabel对象写入存储,支持文件存储(file),内存存储(memory)和db存储(DB)。可以在配置通过这个ConfigureFileKey.CONNECT_TYPE key 配置 query 是基于内存的查询,对象定时load到内存,可以在属性文件通过这个ConfigureFileKey.POLLING_TIME key配置加载周期,单位是秒 新对象加载后生效,已经存在的对象只有updateFlag发生变化才会被替换
+ */
+public class ConfigurableComponent extends AbstractComponent<IConfigurableService>
+ implements IConfigurableService {
+
+ private static final Log LOG = LogFactory.getLog(ConfigurableComponent.class);
+
+ protected volatile IConfigurableService configureService = null;
+
+ protected transient String namespace;
+
+ public ConfigurableComponent() {
+ initConfigurableServiceDescriptor();
+ addConfigureDescriptor(
+ new ConfigureDescriptor(CONNECT_TYPE, false, ConfigurableServcieType.DEFAULT_SERVICE_NAME));
+ }
+
+ public static ConfigurableComponent getInstance(String namespace) {
+ return ComponentCreator.getComponent(namespace, ConfigurableComponent.class);
+ }
+
+ @Override
+ protected boolean initProperties(Properties properties) {
+ try {
+ if (configureService != null) {
+ return true;
+ }
+ this.configureService = ConfigurableServiceFactory.createConfigurableService(properties);
+ return true;
+ } catch (Exception e) {
+ LOG.error("ConfigurableComponent create error,properties= " + properties, e);
+ return false;
+ }
+
+ }
+
+ @Override
+ public boolean startComponent(String namespace) {
+ try {
+ this.namespace = namespace;
+ configureService.initConfigurables(namespace);
+ return true;
+ } catch (Exception e) {
+ LOG.error("ConfigurableComponent init error, namespace is " + namespace, e);
+ return false;
+ }
+
+ }
+
+ /**
+ * 启动测试模式,用内存数据库存储和加载configurable数据
+ */
+ public static void begineTestMode() {
+ System.setProperty(ConfigurableComponent.CONNECT_TYPE, ConfigurableServcieType.MEMORY_SERVICE_NAME);
+ }
+
+ /**
+ * 关闭测试模式,用配置文件中配置的属性加载configuable数据
+ */
+ public static void endTestMode() {
+ System.clearProperty(ConfigurableComponent.CONNECT_TYPE);
+ }
+
+ @Override
+ public boolean stop() {
+ return true;
+ }
+
+ @Override
+ public IConfigurableService getService() {
+ return configureService;
+ }
+
+ @Override
+ public void initConfigurables(String namespace) {
+ configureService.initConfigurables(namespace);
+ }
+
+ @Override
+ public boolean refreshConfigurable(String namespace) {
+ return configureService.refreshConfigurable(namespace);
+ }
+
+ public void mockConfigurable(String namespace) {
+ refreshConfigurable(namespace);
+
+ }
+
+ @Override
+ public List<IConfigurable> queryConfigurable(String type) {
+ return configureService.queryConfigurable(type);
+ }
+
+ @Override
+ public <T extends IConfigurable> List<T> queryConfigurableByType(String type) {
+ return configureService.queryConfigurableByType(type);
+ }
+
+ @Override
+ public IConfigurable queryConfigurableByIdent(String type, String name) {
+ return configureService.queryConfigurableByIdent(type, name);
+ }
+
+ @Override
+ public IConfigurable queryConfigurableByIdent(String identification) {
+ return configureService.queryConfigurableByIdent(identification);
+ }
+
+ @Override
+ public void insert(IConfigurable configurable) {
+ configureService.insert(configurable);
+ ConfigurableUtil.refreshMock(configurable);
+ }
+
+ @Override
+ public void update(IConfigurable configurable) {
+ configureService.update(configurable);
+ }
+
+ @Override
+ public <T> Map<String, T> queryConfigurableMapByType(String type) {
+ return configureService.queryConfigurableMapByType(type);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T queryConfigurable(String configurableType, String name) {
+ return (T)queryConfigurableByIdent(configurableType, name);
+ }
+
+ //protected void insertConfigurable(JSONObject message, IConfigurable configurable) {
+ // ConfigurableUtil.insertConfigurable(message, configurable, this.configureService);
+ //}
+
+ @Override
+ public String getNamespace() {
+ if (AbstractConfigurableService.class.isInstance(configureService)) {
+ return ((AbstractConfigurableService)configureService).getNamespace();
+ }
+ return namespace;
+ }
+
+ @Override
+ public Collection<IConfigurable> findAll() {
+ return configureService.findAll();
+ }
+
+ @Override
+ public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+ return configureService.loadConfigurableFromStorage(type);
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/model/Configure.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/model/Configure.java
new file mode 100644
index 0000000..bb4531f
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/model/Configure.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.model;
+
+import org.apache.rocketmq.streams.common.model.Entity;
+
+/**
+ * configuable如果存储在db,这个是db表的映射对象
+ */
+public class Configure extends Entity {
+
+ private static final long serialVersionUID = 5668017348345235669L;
+
+ private String nameSpace;
+ private String type;
+ private String name;
+ // private String identification;
+ private String jsonValue;
+ private String modifyTime;
+ private String remark;
+ private int openRange;
+
+ public static String createTableSQL(String tableName) {
+ return "/******************************************/\n"
+ + "/* TableName = dipper_configure */\n"
+ + "/******************************************/\n"
+ + "CREATE TABLE IF NOT EXISTS `" + tableName + "` (\n"
+ + " `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',\n"
+ + " `gmt_create` datetime NOT NULL COMMENT '创建时间',\n"
+ + " `gmt_modified` datetime NOT NULL COMMENT '修改时间',\n"
+ + " `namespace` varchar(32) NOT NULL COMMENT '项目标识',\n"
+ + " `type` varchar(32) NOT NULL COMMENT '配置类型',\n"
+ + " `name` varchar(128) NOT NULL COMMENT '配置名称',\n"
+ + " `json_value` text NOT NULL COMMENT '配置内容',\n"
+ + " `status` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '1:正在使用 0:已失效',\n"
+ + " PRIMARY KEY (`id`),\n"
+ + " UNIQUE KEY `uk_namespace_type_name` (`namespace`,`type`,`name`),\n"
+ + " KEY `idx_namespace` (`namespace`)\n"
+ + ") ENGINE=InnoDB AUTO_INCREMENT=1814834 DEFAULT CHARSET=utf8 COMMENT='统一接入配置项'\n"
+ + ";";
+ }
+
+ public String getNameSpace() {
+ return nameSpace;
+ }
+
+ public void setNameSpace(String nameSpace) {
+ this.nameSpace = nameSpace;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ // public String getIdentification() {
+ // return identification;
+ // }
+
+ // public void createIdentification() {
+ // this.identification = MapKeyUtil.createKey(nameSpace, type, name);
+ // }
+
+ public String getJsonValue() {
+ return jsonValue;
+ }
+
+ public void setJsonValue(String jsonValue) {
+ this.jsonValue = jsonValue;
+ }
+
+ public String getModifyTime() {
+ return modifyTime;
+ }
+
+ public void setModifyTime(String modifyTime) {
+ this.modifyTime = modifyTime;
+ }
+
+ public String getRemark() {
+ return remark;
+ }
+
+ public void setRemark(String remark) {
+ this.remark = remark;
+ }
+
+ public int getOpenRange() {
+ return openRange;
+ }
+
+ public void setOpenRange(int openRange) {
+ this.openRange = openRange;
+ }
+
+ @Override
+ public String toString() {
+ return "Configure{" + "nameSpace='" + nameSpace + '\'' + ", type='" + type + '\'' + ", name='" + name + '\''
+ + ", jsonValue='" + jsonValue + '\'' + ", modifyTime='" + modifyTime + '\'' + ", remark='" + remark + '\''
+ + ", openRange=" + openRange + '}';
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
new file mode 100644
index 0000000..85ef626
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.datatype.DataType;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.common.utils.*;
+import org.apache.rocketmq.streams.configurable.model.Configure;
+
+import java.util.*;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractConfigurableService implements IConfigurableService {
+
+ private static final Log LOG = LogFactory.getLog(AbstractConfigurableService.class);
+
+ private static final String CLASS_NAME = IConfigurableService.CLASS_NAME;
+
+ protected Map<String, List<IConfigurable>> type2ConfigurableMap = new HashMap<>();
+
+ protected Map<String, IConfigurable> name2ConfigurableMap = new HashMap<>();
+
+ protected Map<String, IConfigurable> configurableMap = new HashMap<>();
+
+ protected Properties properties;
+
+ protected transient String namespace;
+
+ public AbstractConfigurableService(Properties properties) {
+ this.properties = properties;
+ }
+
+ public AbstractConfigurableService() {
+ }
+
+ @Override
+ public IConfigurable queryConfigurableByIdent(String identification) {
+ return name2ConfigurableMap.get(identification);
+ }
+
+ protected String getConfigureKey(String nameSpace, String type, String name) {
+ return MapKeyUtil.createKey(nameSpace, type, name);
+ }
+
+ protected void updateConfiguresCache(IConfigurable configurable) {
+ if (configurable == null) {
+ return;
+ }
+ configurable.toJson();
+ String key = getConfigureKey(configurable.getNameSpace(), configurable.getType(), configurable.getConfigureName());
+ configurableMap.put(key, configurable);
+ }
+
+ protected void updateConfiguresCache(List<IConfigurable> configureList) {
+ for (IConfigurable iConfigurable : configureList) {
+ updateConfiguresCache(iConfigurable);
+ }
+ }
+
+ protected boolean equals(String key, List<?> newConfigureList) {
+ for (Object o : newConfigureList) {
+ IConfigurable configure = (IConfigurable)o;
+ String tempKey = getConfigureKey(configure.getNameSpace(), configure.getType(), configure.getConfigureName());
+ if (key.equals(tempKey)) {
+ IConfigurable oldConfigure = configurableMap.get(key);
+ if (oldConfigure == null) {
+ continue;
+ }
+ return ConfigurableUtil.compare(oldConfigure, configure);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public <T extends IConfigurable> List<T> queryConfigurableByType(String type) {
+ List<IConfigurable> list = queryConfigurable(type);
+ if (list == null) {
+ return new ArrayList<T>();
+ }
+ List<T> result = new ArrayList<T>();
+ for (IConfigurable configurable : list) {
+ result.add((T)configurable);
+ }
+ return result;
+ }
+
+ @Override
+ public boolean refreshConfigurable(String namespace) {
+ //每次刷新,重新刷新配置文件
+ //if(ComponentCreator.propertiesPath!=null){
+ // ComponentCreator.setProperties(ComponentCreator.propertiesPath);
+ //}
+ this.namespace = namespace;
+ // Map<String, List<IConfigurable>> namespace2ConfigurableMap = new HashMap<>();
+ Map<String, List<IConfigurable>> tempType2ConfigurableMap = new HashMap<>();
+ Map<String, IConfigurable> tempName2ConfigurableMap = new HashMap<>();
+ GetConfigureResult configures = loadConfigurable(namespace);
+ // updateConfiguresCache(configures.getConfigure());
+ if (configures != null && configures.isQuerySuccess() && configures.getConfigurables() != null) {
+ // List<Configure> configureList = filterConfigure(configures.getConfigure());
+ List<IConfigurable> configurables = configures.getConfigurables();
+ List<IConfigurable> configurableList = checkAndUpdateConfigurables(namespace, configurables,
+ tempType2ConfigurableMap, tempName2ConfigurableMap,
+ configures.getConfigurables());
+ // this.namespace2ConfigurableMap = namespace2ConfigurableMap;
+ for (IConfigurable configurable : configurableList) {
+ if (configurable instanceof IAfterConfiguableRefreshListerner) {
+ ((IAfterConfiguableRefreshListerner)configurable).doProcessAfterRefreshConfigurable(this);
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public <T> T queryConfigurable(String configurableType, String name) {
+ return (T)queryConfigurableByIdent(configurableType, name);
+ }
+
+ protected List<IConfigurable> checkAndUpdateConfigurables(String namespace, List<IConfigurable> configurables,
+ Map<String, List<IConfigurable>> tempType2ConfigurableMap,
+ Map<String, IConfigurable> tempName2ConfigurableMap,
+ List configures) {
+ List<IConfigurable> configurableList = new ArrayList<>();
+ for (IConfigurable configurable : configurables) {
+ try {
+ boolean isUpdate = update(configurable, tempName2ConfigurableMap, tempType2ConfigurableMap);
+ if (isUpdate) {
+ configurableList.add(configurable);
+ }
+ } catch (Exception e) {
+ LOG.error("组件初始化异常:" + e.getMessage() + ",name=" + configurable.getConfigureName(), e);
+ }
+ }
+ destroyOldConfigurables(tempName2ConfigurableMap);
+ this.name2ConfigurableMap = tempName2ConfigurableMap;
+ this.type2ConfigurableMap = tempType2ConfigurableMap;
+ return configurableList;
+ }
+
+ private void destroyOldConfigurables(Map<String, IConfigurable> tempName2ConfigurableMap) {
+ Iterator<Map.Entry<String, IConfigurable>> it = this.name2ConfigurableMap.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, IConfigurable> entry = it.next();
+ String key = entry.getKey();
+ IConfigurable value = entry.getValue();
+ if (!tempName2ConfigurableMap.containsKey(key)) {
+ destroyOldConfigurable(value);
+ }
+ }
+
+ }
+
+ private void destroyOldConfigurable(IConfigurable oldConfigurable) {
+ if (AbstractConfigurable.class.isInstance(oldConfigurable)) {
+ ((AbstractConfigurable)oldConfigurable).destroy();
+ }
+ String key = getConfigureKey(oldConfigurable.getNameSpace(), oldConfigurable.getType(),
+ oldConfigurable.getConfigureName());
+ configurableMap.remove(key);
+ }
+
+ protected void initConfigurable(IConfigurable configurable) {
+ if (AbstractConfigurable.class.isInstance(configurable)) {
+ AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
+ abstractConfigurable.setConfigurableService(this);
+ }
+
+ configurable.init();
+
+ }
+
+ /**
+ * 内部使用
+ */
+ private ScheduledExecutorService scheduledExecutorService;
+
+ @Override
+ public void initConfigurables(final String namespace) {
+ refreshConfigurable(namespace);
+ long polingTime = -1;
+ if (this.properties != null) {
+ String pollingTimeStr = this.properties.getProperty(AbstractComponent.POLLING_TIME);
+ if (StringUtil.isNotEmpty(pollingTimeStr)) {
+ polingTime = Long.valueOf(pollingTimeStr);
+ }
+ }
+ if (polingTime > 0) {
+ scheduledExecutorService = new ScheduledThreadPoolExecutor(3);
+ scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ refreshConfigurable(namespace);
+ } catch (Exception e) {
+ LOG.error("Load configurables error:" + e.getMessage(), e);
+ }
+ }
+ }, polingTime, polingTime, TimeUnit.SECONDS);
+ }
+ }
+ // @Override
+ // public List<IConfigurable> queryConfigurable(String nameSpace) {
+ // return namespace2ConfigurableMap.get(nameSpace);
+ // }
+
+ @Override
+ public List<IConfigurable> queryConfigurable(String type) {
+ String key = MapKeyUtil.createKey(type);
+ return type2ConfigurableMap.get(key);
+ }
+
+ @Override
+ public IConfigurable queryConfigurableByIdent(String type, String name) {
+ String key = MapKeyUtil.createKey(type, name);
+ return name2ConfigurableMap.get(key);
+ }
+
+ /**
+ * 根据namespace加载配置信息
+ *
+ * @param namespace
+ * @return
+ * @throws Exception
+ */
+ protected abstract GetConfigureResult loadConfigurable(String namespace);
+
+ @Override
+ public void update(IConfigurable configurable) {
+ // update(configurable,name2ConfigurableMap,type2ConfigurableMap);
+ updateConfigurable(configurable);
+ }
+
+ protected abstract void updateConfigurable(IConfigurable configurable);
+
+ protected abstract void insertConfigurable(IConfigurable configurable);
+
+ protected boolean update(IConfigurable configurable, Map<String, IConfigurable> name2ConfigurableMap,
+ Map<String, List<IConfigurable>> type2ConfigurableMap) {
+ if (configurable == null) {
+ return false;
+ }
+
+ boolean isUpdate = false;
+ List<IConfigurable> configurableList = new ArrayList<>();
+ configurableList.add(configurable);
+
+ String nameKey = MapKeyUtil.createKey(configurable.getType(), configurable.getConfigureName());
+ if (this.name2ConfigurableMap.containsKey(nameKey)) {
+ String configureKey = getConfigureKey(namespace, configurable.getType(), configurable.getConfigureName());
+ IConfigurable oldConfigurable = this.name2ConfigurableMap.get(nameKey);
+ if (equals(configureKey, configurableList)) {
+ configurable = oldConfigurable;
+ // name2ConfigurableMap.put(nameKey, name2ConfigurableMap.get(nameKey));
+ } else {
+ destroyOldConfigurable(oldConfigurable);
+ initConfigurable(configurable);
+ isUpdate = true;
+ }
+ } else {
+ initConfigurable(configurable);
+ isUpdate = true;
+ }
+ updateConfiguresCache(configurable);
+ name2ConfigurableMap.put(nameKey, configurable);
+ String typeKey = MapKeyUtil.createKey(configurable.getType());
+ // put2Map(namespace2ConfigurableMap, namespace, configurable);
+ put2Map(type2ConfigurableMap, typeKey, configurable);
+ return isUpdate;
+ }
+
+ @Override
+ public void insert(IConfigurable configurable) {
+ // update(configurable,name2ConfigurableMap,type2ConfigurableMap);
+ insertConfigurable(configurable);
+ }
+
+ /**
+ * 给一个扣,可以跨命名空间查询数据
+ *
+ * @param namespaces
+ * @return
+ */
+ public List<IConfigurable> queryConfiguableByNamespace(String... namespaces) {
+ List<IConfigurable> configurables = new ArrayList<>();
+ if (namespaces == null || namespaces.length == 0) {
+ return configurables;
+ }
+ for (String namespace : namespaces) {
+ GetConfigureResult result = loadConfigurable(namespace);
+ if (result.querySuccess) {
+ if (result.configurables != null && result.configurables.size() > 0) {
+ configurables.addAll(result.configurables);
+ }
+ } else {
+ throw new RuntimeException("Load configurable error, the namespace is " + namespace);
+ }
+ }
+ return configurables;
+
+ }
+
+ /**
+ * 往一个value是list的map中添加数据,如果list是空创建,否则直接插入
+ *
+ * @param map
+ * @param key
+ * @param configurable
+ */
+ protected void put2Map(Map<String, List<IConfigurable>> map, String key, IConfigurable configurable) {
+ List<IConfigurable> list = map.get(key);
+ if (list == null) {
+ list = new ArrayList<IConfigurable>();
+ map.put(key, list);
+ }
+ list.add(configurable);
+ }
+
+ @Override
+ public Collection<IConfigurable> findAll() {
+ return name2ConfigurableMap.values();
+ }
+
+ /**
+ * 把configurable转换成configure
+ *
+ * @param configurable
+ * @return
+ */
+ protected Configure createConfigure(IConfigurable configurable) {
+ Configure configure = new Configure();
+ configure.setType(configurable.getType());
+ configure.setName(configurable.getConfigureName());
+ configure.setNameSpace(configurable.getNameSpace());
+ String jsonString = configurable.toJson();
+ if (!StringUtil.isEmpty(jsonString)) {
+ JSONObject jsonObject = JSONObject.parseObject(jsonString);
+ jsonObject.put(CLASS_NAME, configurable.getClass().getName());
+ configure.setJsonValue(jsonObject.toJSONString());
+ }
+ // configure.createIdentification();
+ return configure;
+ }
+
+ @Override
+ public <T> Map<String, T> queryConfigurableMapByType(String type) {
+ List<IConfigurable> configurables = queryConfigurable(type);
+ if (configurables == null) {
+ return new HashMap<String, T>();
+ }
+ Map<String, T> result = new HashMap<String, T>();
+ for (IConfigurable configurable : configurables) {
+ result.put(configurable.getConfigureName(), (T)configurable);
+ }
+ return result;
+ }
+
+ /**
+ * 把configure转换成configurable
+ *
+ * @param configures
+ * @return
+ */
+ protected List<IConfigurable> convert(List<Configure> configures) {
+ if (configures == null) {
+ return new ArrayList<IConfigurable>();
+ }
+ List<IConfigurable> configurables = new ArrayList<IConfigurable>();
+ for (Configure configure : configures) {
+ IConfigurable configurable = convert(configure);
+ if (configurable != null) {
+ configurables.add(configurable);
+ }
+
+ }
+ return configurables;
+ }
+
+ protected IConfigurable createConfigurableFromJson(String namespace, String type, String name, String jsonValue) {
+ if (StringUtil.isEmpty(jsonValue)) {
+ return null;
+ }
+ JSONObject jsonObject = JSONObject.parseObject(jsonValue);
+ String className = jsonObject.getString(CLASS_NAME);
+ IConfigurable configurable = createConfigurable(className);
+ if (configurable == null) {
+ return null;
+ }
+ configurable.setConfigureName(name);
+ configurable.setNameSpace(namespace);
+ configurable.setType(type);
+ if (AbstractConfigurable.class.isInstance(configurable)) {
+ AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
+ abstractConfigurable.setConfigurableService(this);
+ }
+ configurable.toObject(jsonValue);
+ return configurable;
+ }
+
+ /**
+ * 提供一个入口,可以让外部用户改变configure对应的configurable的值
+ *
+ * @param configure
+ * @return
+ */
+ protected IConfigurable convert(Configure configure) {
+
+ return convertConfigurable(configure);
+ }
+
+ protected IConfigurable convertConfigurable(Configure configure) {
+ String className = null;
+ try {
+ String jsonString = configure.getJsonValue();
+ IConfigurable configurable =
+ createConfigurableFromJson(configure.getNameSpace(), configure.getType(), configure.getName(),
+ jsonString);
+ if (configurable instanceof Entity) {
+ // add by wangtl 20171110 Configurable接口第三方包也在用,故不能Configurable里加接口,只能加到抽象类里,这里强转下
+ Entity abs = (Entity)configurable;
+ abs.setId(configure.getId());
+ abs.setGmtCreate(configure.getGmtCreate());
+ abs.setGmtModified(configure.getGmtModified());
+ /*
+ * abs.setTempKey((configurable.getNameSpace() + configurable.getType() +
+ * configurable.getConfigureName() + jsonString).hashCode());
+ */
+ }
+ convertPost(configurable);
+ return configurable;
+ } catch (Exception e) {
+ LOG.error("转换异常:" + configure.toString(), e);
+ return null;
+ }
+ }
+
+ /**
+ * 如果需要改变configurable的属性,可以再这里设置
+ *
+ * @param configurable
+ */
+ @SuppressWarnings("rawtypes")
+ protected void convertPost(IConfigurable configurable) {
+ if (this.properties == null) {
+ return;
+ }
+ String identification =
+ MapKeyUtil.createKey(configurable.getNameSpace(), configurable.getType(), configurable.getConfigureName());
+ String propertyValue = this.properties.getProperty(identification);
+ if (StringUtil.isEmpty(propertyValue)) {
+ return;
+ }
+ String[] fieldName2Values = propertyValue.split(",");
+ if (fieldName2Values == null || fieldName2Values.length == 0) {
+ return;
+ }
+ for (String fieldName2Value : fieldName2Values) {
+ try {
+ String[] fieldName2ValueArray = fieldName2Value.split(":");
+ if (fieldName2ValueArray == null || fieldName2ValueArray.length != 2) {
+ continue;
+ }
+ String fieldName = fieldName2ValueArray[0];
+ String value = fieldName2ValueArray[1];
+ Class clazz = ReflectUtil.getBeanFieldType(configurable.getClass(), fieldName);
+ DataType dataType = DataTypeUtil.createDataType(clazz, null);
+ if (dataType == null) {
+ continue;
+ }
+ Object fieldValue = dataType.getData(value);
+ ReflectUtil.setBeanFieldValue(configurable, fieldName, fieldValue);
+
+ } catch (Exception e) {
+ LOG.error("convert post error " + fieldName2Value, e);
+ continue;
+ }
+
+ }
+ }
+
+ /**
+ * 创建configurable对象
+ *
+ * @param className class name
+ * @return
+ */
+ @SuppressWarnings("rawtypes")
+ protected IConfigurable createConfigurable(String className) {
+ return ReflectUtil.forInstance(className);
+ }
+
+ public class GetConfigureResult {
+
+ private boolean querySuccess;
+ private List<IConfigurable> configurables;
+
+ public boolean isQuerySuccess() {
+ return querySuccess;
+ }
+
+ public void setQuerySuccess(boolean querySuccess) {
+ this.querySuccess = querySuccess;
+ }
+
+ public List<IConfigurable> getConfigurables() {
+ return configurables;
+ }
+
+ public void setConfigurables(List<IConfigurable> configurables) {
+ this.configurables = configurables;
+ }
+ }
+
+ @Override
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java
new file mode 100644
index 0000000..d2d26cd
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * namespace 分层,支持顶级命名空间,顶级命名空间的对象,所有命名空间都可见。顶级命名空间是固定值IConfigurableService.PARENT_CHANNEL_NAME_SPACE
+ */
+public abstract class AbstractSupportParentConfigureService extends AbstractConfigurableService
+ implements IConfigurableService {
+
+ private static final Log LOG = LogFactory.getLog(AbstractSupportParentConfigureService.class);
+ protected IConfigurableService configureService = null;
+ protected IConfigurableService parentConfigureService = null;
+ //protected IConfigurableService shareConfigureService = null;
+ protected Properties properties;
+
+ public AbstractSupportParentConfigureService() {
+ super(null);
+ }
+
+ public void initMethod(Properties property) {
+ this.properties = property;
+ initBeforeInitConfigurable(property);
+ }
+
+ protected abstract void initBeforeInitConfigurable(Properties property);
+
+ @Override
+ public void initConfigurables(String namespace) {
+
+ if (!IConfigurableService.PARENT_CHANNEL_NAME_SPACE.equals(namespace)) {
+ parentConfigureService.initConfigurables(IConfigurableService.PARENT_CHANNEL_NAME_SPACE);
+ } else {
+ parentConfigureService = null;
+ }
+ configureService.initConfigurables(namespace);
+ }
+
+ @Override
+ public boolean refreshConfigurable(String namespace) {
+
+ if (!IConfigurableService.PARENT_CHANNEL_NAME_SPACE.equals(namespace)) {
+ parentConfigureService.refreshConfigurable(IConfigurableService.PARENT_CHANNEL_NAME_SPACE);
+ // initShareConfigurableService(namespace);
+ }
+ configureService.refreshConfigurable(namespace);
+ return true;
+ }
+
+ @Override
+ public List<IConfigurable> queryConfigurable(String type) {
+ List<IConfigurable> result = configureService.queryConfigurable(type);
+ if (result == null) {
+ result = new ArrayList<>();
+ }
+ //if (shareConfigureService != null) {
+ // List<IConfigurable> share = shareConfigureService.queryConfigurable(type);
+ // if (share != null) {
+ // result.addAll(share);
+ // }
+ //}
+ if (parentConfigureService == null) {
+ return result;
+ }
+ List<IConfigurable> parent = parentConfigureService.queryConfigurable(type);
+ if (parent != null) {
+ result.addAll(parent);
+ }
+ return result;
+ }
+
+ @Override
+ public IConfigurable queryConfigurableByIdent(String type, String name) {
+ IConfigurable configurable = configureService.queryConfigurableByIdent(type, name);
+ if (configurable != null) {
+ return configurable;
+ }
+ if (parentConfigureService == null) {
+ return null;
+ }
+ //if (shareConfigureService != null) {
+ // configurable = shareConfigureService.queryConfigurableByIdent(type, name);
+ //}
+ if (configurable != null) {
+ return configurable;
+ }
+ return parentConfigureService.queryConfigurableByIdent(type, name);
+ }
+
+ @Override
+ public IConfigurable queryConfigurableByIdent(String identification) {
+ IConfigurable configurable = configureService.queryConfigurableByIdent(identification);
+ if (configurable != null) {
+ return configurable;
+ }
+ if (parentConfigureService == null) {
+ return null;
+ }
+ //if (shareConfigureService != null) {
+ // configurable = shareConfigureService.queryConfigurableByIdent(identification);
+ //}
+ if (configurable != null) {
+ return configurable;
+ }
+ return parentConfigureService.queryConfigurableByIdent(identification);
+ }
+
+ @Override
+ protected void insertConfigurable(IConfigurable configurable) {
+ if (parentConfigureService != null && configurable.getNameSpace()
+ .equals(IConfigurableService.PARENT_CHANNEL_NAME_SPACE)) {
+ parentConfigureService.insert(configurable);
+ } else {
+ configureService.insert(configurable);
+ }
+ }
+
+ @Override
+ protected void updateConfigurable(IConfigurable configurable) {
+ if (parentConfigureService != null && configurable.getNameSpace()
+ .equals(IConfigurableService.PARENT_CHANNEL_NAME_SPACE)) {
+ parentConfigureService.update(configurable);
+ } else {
+ configureService.update(configurable);
+ }
+ }
+
+ @Override
+ public <T> T queryConfigurable(String configurableType, String name) {
+ return (T)queryConfigurableByIdent(configurableType, name);
+ }
+
+ @Override
+ protected GetConfigureResult loadConfigurable(String namespace) {
+ return null;
+ }
+
+ //protected void initShareConfigurableService(String namespace) {
+ // if (parentConfigureService == null) {
+ // return;
+ // }
+ // shareConfigureService = new AbstractReadOnlyConfigurableService() {
+ //
+ // @Override
+ // public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+ // refreshConfigurable(namespace);
+ // return queryConfigurableByType(type);
+ // }
+ //
+ // @Override
+ // protected List<IConfigurable> loadConfigurables(String namespace) {
+ // List<IConfigurable> parent = parentConfigureService.queryConfigurable(ShareConfiguable.TYPE);
+ // List<IConfigurable> shareConfigurables = new ArrayList<>();
+ // if (parent == null) {
+ // return shareConfigurables;
+ // }
+ // for (IConfigurable configurable : parent) {
+ // ShareConfiguable shareConfiguable = (ShareConfiguable) configurable;
+ // if (shareConfiguable.getShareAll() || shareConfiguable.getShareNameSpaces().contains(namespace)) {
+ // String sharedNameSpace = shareConfiguable.getSharedNameSpace();
+ // String sharedType = shareConfiguable.getSharedType();
+ // String sharedName = shareConfiguable.getSharedName();
+ // List<IConfigurable> sharedConfigrables =
+ // createAndQueryConfigurable(sharedNameSpace, sharedType, sharedName);
+ // if (sharedConfigrables != null) {
+ // shareConfigurables.addAll(sharedConfigrables);
+ // }
+ // }
+ // }
+ // return shareConfigurables;
+ // }
+ //
+ //
+ // };
+ // shareConfigureService.refreshConfigurable(namespace);
+ //
+ //}
+
+ protected List<IConfigurable> createAndQueryConfigurable(String sharedNameSpace, String sharedType,
+ String sharedName) {
+ IConfigurableService innerSharedConfigurableService =
+ ConfigurableServiceFactory.createConfigurableService(properties);
+ innerSharedConfigurableService.refreshConfigurable(sharedNameSpace);
+ if (StringUtil.isNotEmpty(sharedName)) {
+ List<IConfigurable> configurables = new ArrayList<>();
+ IConfigurable configurable = innerSharedConfigurableService.queryConfigurableByIdent(sharedType, sharedName);
+ configurables.add(configurable);
+ return configurables;
+ } else {
+ return innerSharedConfigurableService.queryConfigurable(sharedType);
+ }
+
+ }
+
+ @Override
+ public Collection<IConfigurable> findAll() {
+ List<IConfigurable> configurables = new ArrayList<>();
+ if (parentConfigureService != null) {
+ Collection<IConfigurable> tmp = parentConfigureService.findAll();
+ if (tmp != null || tmp.size() > 0) {
+ configurables.addAll(tmp);
+ }
+ }
+ Collection<IConfigurable> tmp = configureService.findAll();
+ if (tmp != null || tmp.size() > 0) {
+ configurables.addAll(tmp);
+ }
+ return configurables;
+ }
+
+ public IConfigurableService getConfigureService() {
+ return configureService;
+ }
+
+ @Override
+ public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+ List<T> configurables = new ArrayList<>();
+ if (parentConfigureService != null) {
+ Collection<T> tmp = parentConfigureService.loadConfigurableFromStorage(type);
+ if (tmp != null || tmp.size() > 0) {
+ configurables.addAll(tmp);
+ }
+ }
+ Collection<T> tmp = configureService.loadConfigurableFromStorage(type);
+ if (tmp != null || tmp.size() > 0) {
+ configurables.addAll(tmp);
+ }
+ return configurables;
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServcieType.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServcieType.java
new file mode 100644
index 0000000..a4998ff
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServcieType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+
+@Deprecated
+public class ConfigurableServcieType {
+
+ public static final String DEFAULT_SERVICE_NAME = IConfigurableService.DEFAULT_SERVICE_NAME;
+ public static final String MEMORY_SERVICE_NAME = IConfigurableService.MEMORY_SERVICE_NAME;
+ public static final String FILE_SERVICE_NAME = IConfigurableService.FILE_SERVICE_NAME;
+ public static final String HTTP_SERVICE_NAME = IConfigurableService.HTTP_SERVICE_NAME;
+
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
new file mode 100644
index 0000000..449f04a
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;
+
+import java.util.Properties;
+
+public class ConfigurableServiceFactory {
+ private static ServiceLoaderComponent<IConfigurableService> configurableServiceLoaderComponent =
+ ServiceLoaderComponent.getInstance(IConfigurableService.class);
+ public static final String CONFIGURABLE_SERVICE_TYPE = "dipper.configurable.service.type";
+ private static final Log LOG = LogFactory.getLog(ConfigurableServiceFactory.class);
+
+ public static IConfigurableService createConfigurableService(Properties properties) {
+ try {
+ Properties properties1 = new Properties();
+ properties1.putAll(properties);
+ String type = properties1.getProperty(CONFIGURABLE_SERVICE_TYPE);
+ if (StringUtil.isEmpty(type)) {
+ type = IConfigurableService.DEFAULT_SERVICE_NAME;
+ ;
+ }
+ IConfigurableService configurableService = getConfigurableServcieType(type);
+ if (AbstractSupportParentConfigureService.class.isInstance(configurableService)) {
+ ((AbstractSupportParentConfigureService)configurableService).initMethod(properties1);
+ }
+ return configurableService;
+ } catch (Exception e) {
+ LOG.error("create ConfigurableService error", e);
+ return null;
+ }
+
+ }
+
+ public static IConfigurableService getConfigurableServcieType(String type) {
+ IConfigurableService configurableService = (IConfigurableService)configurableServiceLoaderComponent.getService().loadService(type);
+ return ReflectUtil.forInstance(configurableService.getClass().getName());
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
new file mode 100644
index 0000000..1a5fe9b
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service.impl;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configurable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.configurable.model.Configure;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.common.utils.DateUtil;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class FileConfigureService extends AbstractConfigurableService {
+
+ public static final String FILE_PATH_NAME = IConfigurableService.FILE_PATH_NAME;
+ // 配置文件的路径
+ private static final Log LOG = LogFactory.getLog(FileConfigureService.class);
+ private static final String DEFAULT_FILE_NAME = "dipper_configure.cs"; // 默认文件名
+ private static final String SIGN = "&&&&"; // 字段分割附号
+ public String fileName;
+
+ public FileConfigureService(Properties properties) {
+ super(properties);
+ initService(properties.getProperty(FILE_PATH_NAME));
+ }
+
+ protected void initService(String fileAndPath) {
+ if (StringUtil.isEmpty(fileAndPath)) {
+ String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+ if (path.endsWith(".jar")) {
+ int index = path.lastIndexOf(File.separator);
+ path = path.substring(0, index);
+ }
+ fileName = FileUtil.concatFilePath(path, DEFAULT_FILE_NAME);
+ } else {
+ fileName = fileAndPath;
+ }
+ LOG.info("load file from path = " + fileName);
+ }
+
+ @Override
+ protected GetConfigureResult loadConfigurable(String namespace) {
+ GetConfigureResult result = new GetConfigureResult();
+ try {
+ List<Configure> configures = selectOpening(namespace);
+ List<IConfigurable> configurables = convert(configures);
+ LOG.info("load configure namespace=" + namespace + " count=" + configures.size());
+ result.setConfigurables(configurables);
+ result.setQuerySuccess(true);// 该字段标示查询是否成功,若不成功则不会更新配置
+ } catch (Exception e) {
+ result.setQuerySuccess(false);
+ e.printStackTrace();
+ LOG.error("load configurable error ", e);
+ }
+ return result;
+ }
+
+ protected List<Configure> selectOpening(String namespace) {
+ List<String> list = loadFileLine(fileName);
+ List<Configure> configures = convert2Configure(list);
+ return filter(configures, namespace);
+ }
+
+ protected List<Configure> filter(List<Configure> configures, String namespace) {
+ if (configures == null) {
+ return new ArrayList<>();
+ }
+ if (StringUtil.isEmpty(namespace)) {
+ throw new RuntimeException("namespace can not empty ");
+ }
+ List<Configure> filterConfigures = new ArrayList<>();
+ for (Configure configure : configures) {
+ if (!namespace.equals(configure.getNameSpace())) {
+ continue;
+ }
+ filterConfigures.add(configure);
+ }
+ return filterConfigures;
+ }
+
+ @Override
+ protected void insertConfigurable(IConfigurable configure) {
+ if (configure == null) {
+ LOG.warn("insert configure is null");
+ return;
+ }
+ String row = configure2String(configure);
+
+ List<String> rows = loadFileLine(fileName);
+ if (rows == null) {
+ rows = new ArrayList<>();
+ }
+ List<Configure> configures = convert2Configure(rows);
+ String newKey =
+ MapKeyUtil.createKey(configure.getNameSpace(), configure.getType(), configure.getConfigureName());
+ boolean isReplace = false;
+ for (int i = 0; i < configures.size(); i++) {
+ Configure c = configures.get(i);
+ String old = MapKeyUtil.createKey(c.getNameSpace(), c.getType(), c.getName());
+ if (old.equals(newKey)) {
+ rows.set(i, configure2String(configure));
+ isReplace = true;
+ break;
+ }
+ }
+ if (!isReplace) {
+ rows.add(configure2String(configure));
+ }
+ writeFile(fileName, rows);
+ }
+
+ @Override
+ protected void updateConfigurable(IConfigurable configure) {
+ if (configure == null) {
+ LOG.warn("insert configure is null");
+ return;
+ }
+
+ List<String> rows = FileUtil.loadFileLine(fileName);
+ if (rows == null) {
+ rows = new ArrayList<>();
+ }
+ for (int i = 0; i < rows.size(); i++) {
+ String row = rows.get(i);
+ Configure oldConfigure = convert(row);
+ if (configure.getNameSpace().equals(oldConfigure.getNameSpace()) && configure.getType()
+ .equals(oldConfigure.getType()) && configure.getConfigureName().equals(oldConfigure.getName())) {
+ rows.set(i, configure2String(configure));
+ }
+ }
+ writeFile(fileName, rows);
+
+ }
+
+ protected Configure convert(String row) {
+ String[] values = row.split(SIGN);
+ String namespace = getColumnValue(values, 0, "namespace");
+ String type = getColumnValue(values, 1, "type");
+ String name = getColumnValue(values, 2, "name");
+ String jsonValue = getColumnValue(values, 3, "json_value");
+ String createDate = getColumnValue(values, 4, "gmt_create");
+ String modifiedDate = getColumnValue(values, 5, "gmt_modified");
+ String id = getColumnValue(values, 6, "id");
+ Configure configure = new Configure();
+ configure.setNameSpace(namespace);
+ configure.setType(type);
+ configure.setName(name);
+ configure.setJsonValue(jsonValue);
+ configure.setGmtCreate(DateUtil.parse(createDate));
+ configure.setGmtCreate(DateUtil.parse(modifiedDate));
+ configure.setId((id == null ? null : Long.valueOf(id)));
+
+ return configure;
+ }
+
+ protected List<Configure> convert2Configure(List<String> rows) {
+ List<Configure> configures = new ArrayList<Configure>();
+ for (String row : rows) {
+ configures.add(convert(row));
+ }
+ return configures;
+ }
+
+ protected String getColumnValue(String[] values, int i, String namespace) {
+ if (values == null || values.length == 0) {
+ return null;
+ }
+ if (values.length <= i) {
+ return null;
+ }
+ if ("null".equals(values[i])) {
+ return null;
+ }
+ return values[i];
+ }
+
+ /**
+ * 解密文件,并加载到内存
+ *
+ * @param fileName
+ * @return
+ */
+ protected List<String> loadFileLine(String fileName) {
+ List<String> rows = FileUtil.loadFileLine(fileName);
+ if (rows == null) {
+ rows = new ArrayList<>();
+ }
+ return doDecRowList(rows);
+ }
+
+ protected void writeFile(String fileName, List<String> rows) {
+ List<String> rowList = doEncryptRowList(rows);
+ FileUtil.write(fileName, rowList);
+ }
+
+ private List<String> doEncryptRowList(List<String> rows) {
+ return rows;
+ }
+
+ private List<String> doDecRowList(List<String> rows) {
+ return rows;
+ }
+
+ protected String configure2String(IConfigurable configure) {
+ Entity entity = null;
+ if (configure instanceof Entity) {
+ entity = (Entity)configure;
+ } else {
+ entity = new Entity();
+ }
+ String row = MapKeyUtil.createKeyBySign(SIGN, configure.getNameSpace(), configure.getType(),
+ configure.getConfigureName(), configure.toJson(), DateUtil.format(entity.getGmtCreate()),
+ DateUtil.format(entity.getGmtModified()), entity.getId() + "");
+ return row;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ @Override
+ public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+ refreshConfigurable(getNamespace());
+ return queryConfigurableByType(type);
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileSupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileSupportParentConfigureService.java
new file mode 100644
index 0000000..aafa805
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileSupportParentConfigureService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service.impl;
+
+import org.apache.rocketmq.streams.configurable.service.AbstractSupportParentConfigureService;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configurable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import com.google.auto.service.AutoService;
+
+import java.util.Properties;
+
+@AutoService(IConfigurableService.class)
+@ServiceName(ConfigurableServcieType.FILE_SERVICE_NAME)
+public class FileSupportParentConfigureService extends AbstractSupportParentConfigureService {
+
+ @Override
+ protected void initBeforeInitConfigurable(Properties property) {
+ this.configureService = new FileConfigureService(properties);
+ this.parentConfigureService = new FileConfigureService(properties);
+ }
+
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemoryConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemoryConfigureService.java
new file mode 100644
index 0000000..b6d1fd9
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemoryConfigureService.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.configurable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+
+public class MemoryConfigureService extends AbstractConfigurableService {
+
+ private static Map<String, List<IConfigurable>> namespace2Configure = new HashMap<>();
+
+ public MemoryConfigureService(Properties properties) {
+ super(properties);
+ }
+
+ @Override
+ protected GetConfigureResult loadConfigurable(String namespace) {
+ GetConfigureResult result = new GetConfigureResult();
+ result.setQuerySuccess(true);
+ List<IConfigurable> configurableList = new ArrayList<>();
+ List<IConfigurable> configurables = namespace2Configure.get(namespace);
+ if (configurables == null) {
+ configurableList = null;
+ } else {
+ List<IConfigurable> tmps = new ArrayList<>();
+ tmps.addAll(configurables);
+ for (IConfigurable configurable : tmps) {
+ IConfigurable tmp = ReflectUtil.forInstance(configurable.getClass());
+ tmp.toObject(configurable.toJson());
+ tmp.setNameSpace(configurable.getNameSpace());
+ tmp.setConfigureName(configurable.getConfigureName());
+ configurableList.add(tmp);
+ }
+ }
+ result.setConfigurables(configurableList);
+ return result;
+ }
+
+ @Override
+ protected void insertConfigurable(IConfigurable configurable) {
+ if (configurable == null) {
+ return;
+ }
+
+ String namespace = configurable.getNameSpace();
+ List<IConfigurable> list = namespace2Configure.get(namespace);
+ if (list == null) {
+ synchronized (this) {
+ list = namespace2Configure.get(namespace);
+ if (list == null) {
+ list = new ArrayList<>();
+ namespace2Configure.put(namespace, list);
+ }
+ }
+ }
+ int removeIndex = -1;
+ for (int i = 0; i < list.size(); i++) {
+ IConfigurable config = list.get(i);
+ if (config.getType().equals(configurable.getType()) && config.getConfigureName()
+ .equals(configurable.getConfigureName())) {
+ removeIndex = i;
+ }
+ }
+ if (AbstractConfigurable.class.isInstance(configurable)) {
+ ((AbstractConfigurable)configurable).setConfigurableService(this);
+ }
+ if (removeIndex != -1) {
+ list.remove(removeIndex);
+ }
+ list.add(configurable);
+ }
+
+ @Override
+ protected void updateConfigurable(IConfigurable configure) {
+ List<IConfigurable> list = namespace2Configure.get(configure.getNameSpace());
+ if (list == null || list.size() == 0) {
+ throw new RuntimeException(
+ "not have exist configure " + configure.getNameSpace() + "," + configure.getType() + ","
+ + configure.getConfigureName());
+ }
+ for (int i = 0; i < list.size(); i++) {
+ IConfigurable config = list.get(i);
+ if (config.getType().equals(configure.getType()) && config.getConfigureName()
+ .equals(configure.getConfigureName())) {
+ list.set(i, configure);
+ return;
+ }
+ }
+ throw new RuntimeException(
+ "not have exist configure " + configure.getNameSpace() + "," + configure.getType() + ","
+ + configure.getConfigureName());
+ }
+
+ @Override
+ public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+ refreshConfigurable(getNamespace());
+ return queryConfigurableByType(type);
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemorySupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemorySupportParentConfigureService.java
new file mode 100644
index 0000000..92d2095
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemorySupportParentConfigureService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service.impl;
+
+import java.util.Properties;
+
+import org.apache.rocketmq.streams.configurable.service.AbstractSupportParentConfigureService;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configurable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import com.google.auto.service.AutoService;
+
+@AutoService(IConfigurableService.class)
+@ServiceName(ConfigurableServcieType.MEMORY_SERVICE_NAME)
+public class MemorySupportParentConfigureService extends AbstractSupportParentConfigureService {
+
+ @Override
+ protected void initBeforeInitConfigurable(Properties property) {
+ // this.rootConfigureService = new MemoryConfigureService(property);
+ this.parentConfigureService = new MemoryConfigureService(properties);
+ this.configureService = new MemoryConfigureService(properties);
+ }
+}
diff --git a/rocketmq-streams-configurable/src/main/resources/log4j.xml b/rocketmq-streams-configurable/src/main/resources/log4j.xml
new file mode 100755
index 0000000..7812fe7
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/resources/log4j.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "http://toolkit.alibaba-inc.com/dtd/log4j/log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="Console" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{ISO8601} %l [%t] %-5p - %m%n%n"/>
+ </layout>
+ <filter class="org.apache.log4j.varia.LevelRangeFilter">
+ <param name="LevelMin" value="INFO"/>
+ <param name="LevelMax" value="ERROR"/>
+ </filter>
+ </appender>
+
+ <root>
+ <priority value="INFO"/>
+ <appender-ref ref="Console"/>
+ </root>
+
+</log4j:configuration>
\ No newline at end of file
diff --git a/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configuable/ConfiguableComponentTest.java b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configuable/ConfiguableComponentTest.java
new file mode 100644
index 0000000..50cc83b
--- /dev/null
+++ b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configuable/ConfiguableComponentTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable;
+
+import java.util.List;
+
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configuable.model.Person;
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertTrue;
+
+public class ConfiguableComponentTest {
+
+ @Test
+ public void testInsertConfiguable(){
+ String namespace="org.apache.configuable.test";
+ ConfigurableComponent configurableComponent= ConfigurableComponent.getInstance(namespace);
+ Person person=createPerson(namespace);
+ configurableComponent.insert(person);//完成数据存储,在配置文件配置存储类型,支持内存,db和文件,默认是内存
+ //查询只操作内存,存储的数据定时加载到内存,刚插入的数据,还未加载,查询不到
+ assertTrue(configurableComponent.queryConfigurable("person","personName")==null);
+ configurableComponent.refreshConfigurable(namespace);//强制加载数据到内存,可以查询数据
+ assertTrue(configurableComponent.queryConfigurable("person","peronName")!=null);
+ }
+
+
+ @Test
+ public void testConfiguableENVDependence(){
+ String namespace="org.apache.configuable.test";
+ ConfigurableComponent configurableComponent= ConfigurableComponent.getInstance(namespace);
+ Person person=createPerson(namespace);
+ person.setName("persion.name");//对于有ENVDependence的字段,可以不存储真值,存储一个key,把真值配置在配置文件中
+ configurableComponent.insert(person);//完成数据存储,在配置文件配置存储类型,支持内存,db和文件,默认是内存
+ ComponentCreator.getProperties().put("persion.name","realName");//这个代表真实的配置文件,启动时会把配置文件的内容加载到ComponentCreator.getProperties()中
+ configurableComponent.refreshConfigurable(namespace);//刷新存储
+ person=configurableComponent.queryConfigurable("person","peronName");
+ assertTrue(person.getName().equals("realName"));
+ }
+
+
+ @Test
+ public void testSupportParentNameSpace(){
+ String namespace="org.apache.configuable.test";
+ ConfigurableComponent configurableComponent= ConfigurableComponent.getInstance(namespace);
+ Person person=createPerson(namespace);
+ Person otherPerson=createPerson("org.apache.configuable.test1");
+ configurableComponent.insert(person);
+ configurableComponent.insert(otherPerson);
+ configurableComponent.refreshConfigurable(namespace);
+ //只加载自己命名空间的对象
+ List<Person> personList=configurableComponent.queryConfigurableByType("person");
+ assertTrue(personList.size()==1);
+
+ /**
+ * 顶级命名空间的对象,所有namespace都可见
+ */
+ Person thirdPerson=createPerson(IConfigurableService.PARENT_CHANNEL_NAME_SPACE);
+ configurableComponent.insert(thirdPerson);
+ configurableComponent.refreshConfigurable(namespace);//只加载自己命名空间的对象
+ personList=configurableComponent.queryConfigurableByType("person");
+ assertTrue(personList.size()==2);
+ }
+
+
+ //测试定时加载逻辑,当对象的updateFlag值变化后,才会被替换旧对象
+ @Test
+ public void testAutoLoader() throws InterruptedException {
+ ComponentCreator.getProperties().put(AbstractComponent.POLLING_TIME,"1");//1秒后动态加载对象
+ String namespace="org.apache.configuable.test";
+ ConfigurableComponent configurableComponent= ConfigurableComponent.getInstance(namespace);
+ Person person=createPerson(namespace);
+ configurableComponent.insert(person);
+ Thread.sleep(2000);//1秒后,新插入的对象会被加载
+ person=configurableComponent.queryConfigurable("person","peronName");
+ assertTrue(person!=null);
+
+
+ }
+
+ /**
+ * 创建configuable对象
+ * @param namespace
+ * @return
+ */
+ protected Person createPerson(String namespace){
+ Person person=new Person();
+ person.setName("chris");
+ person.setAge(18);
+ person.setNameSpace(namespace);
+ person.setConfigureName("peronName");
+ person.setType("person");
+ return person;
+ }
+}
diff --git a/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java
new file mode 100644
index 0000000..8c4f076
--- /dev/null
+++ b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+public class Person extends BasedConfigurable{
+ @ENVDependence
+ private String name;
+ private int age;
+ private Boolean isMale;
+ private List<String> addresses;
+ private Map<String, Integer> childName2Age;
+
+ public static Person createPerson(String namespace) {
+ Person person = new Person();
+ person.setNameSpace(namespace);
+ person.setType("person");
+ person.setConfigureName("Chris");
+ person.setName("Chris");
+ List<String> addresses = new ArrayList<>();
+ addresses.add("huilongguan");
+ addresses.add("shangdi");
+ person.setAddresses(addresses);
+ Map<String, Integer> childName2Age = new HashMap<>();
+ childName2Age.put("yuanyahan", 8);
+ childName2Age.put("yuanruxi", 4);
+ person.setChildName2Age(childName2Age);
+ person.setMale(true);
+ person.setAge(18);
+ return person;
+ }
+
+ @Override
+ public String toString() {
+ return "Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
+ + ", childName2Age=" + childName2Age + '}';
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ public Boolean getMale() {
+ return isMale;
+ }
+
+ public void setMale(Boolean male) {
+ isMale = male;
+ }
+
+ public List<String> getAddresses() {
+ return addresses;
+ }
+
+ public void setAddresses(List<String> addresses) {
+ this.addresses = addresses;
+ }
+
+ public Map<String, Integer> getChildName2Age() {
+ return childName2Age;
+ }
+
+ public void setChildName2Age(Map<String, Integer> childName2Age) {
+ this.childName2Age = childName2Age;
+ }
+
+ @Override
+ public Object clone() {
+ Person person = null;
+ try {
+ person = (Person)super.clone();
+ } catch (CloneNotSupportedException e) {
+ System.out.println("clone error " + e);
+ }
+ return person;
+ }
+}
diff --git a/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configurable/ConfigurableComponentTest.java b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configurable/ConfigurableComponentTest.java
new file mode 100644
index 0000000..6e632b1
--- /dev/null
+++ b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configurable/ConfigurableComponentTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable;
+
+import java.util.List;
+
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configurable.model.Person;
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertTrue;
+
+public class ConfigurableComponentTest {
+
+ @Test
+ public void testInsertConfigurable() {
+ String namespace = "org.apache.configurable.test";
+ ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace);
+ Person person = createPerson(namespace);
+ configurableComponent.insert(person);//完成数据存储,在配置文件配置存储类型,支持内存,db和文件,默认是内存
+ //查询只操作内存,存储的数据定时加载到内存,刚插入的数据,还未加载,查询不到
+ assertTrue(configurableComponent.queryConfigurable("person", "personName") == null);
+ configurableComponent.refreshConfigurable(namespace);//强制加载数据到内存,可以查询数据
+ assertTrue(configurableComponent.queryConfigurable("person", "peronName") != null);
+ }
+
+ @Test
+ public void testConfigurableENVDependence() {
+ String namespace = "org.apache.configurable.test";
+ ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace);
+ Person person = createPerson(namespace);
+ person.setName("persion.name");//对于有ENVDependence的字段,可以不存储真值,存储一个key,把真值配置在配置文件中
+ configurableComponent.insert(person);//完成数据存储,在配置文件配置存储类型,支持内存,db和文件,默认是内存
+ ComponentCreator.getProperties().put("persion.name", "realName");//这个代表真实的配置文件,启动时会把配置文件的内容加载到ComponentCreator.getProperties()中
+ configurableComponent.refreshConfigurable(namespace);//刷新存储
+ person = configurableComponent.queryConfigurable("person", "peronName");
+ assertTrue(person.getName().equals("realName"));
+ }
+
+ @Test
+ public void testSupportParentNameSpace() {
+ String namespace = "org.apache.configurable.test";
+ ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace);
+ Person person = createPerson(namespace);
+ Person otherPerson = createPerson("org.apache.configuable.test1");
+ configurableComponent.insert(person);
+ configurableComponent.insert(otherPerson);
+ configurableComponent.refreshConfigurable(namespace);
+ //只加载自己命名空间的对象
+ List<Person> personList = configurableComponent.queryConfigurableByType("person");
+ assertTrue(personList.size() == 1);
+
+ /**
+ * 顶级命名空间的对象,所有namespace都可见
+ */
+ Person thirdPerson = createPerson(IConfigurableService.PARENT_CHANNEL_NAME_SPACE);
+ configurableComponent.insert(thirdPerson);
+ configurableComponent.refreshConfigurable(namespace);//只加载自己命名空间的对象
+ personList = configurableComponent.queryConfigurableByType("person");
+ assertTrue(personList.size() == 2);
+ }
+
+ //测试定时加载逻辑,当对象的updateFlag值变化后,才会被替换旧对象
+ @Test
+ public void testAutoLoader() throws InterruptedException {
+ ComponentCreator.getProperties().put(AbstractComponent.POLLING_TIME, "1");//1秒后动态加载对象
+ String namespace = "org.apache.configurable.test";
+ ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace);
+ Person person = createPerson(namespace);
+ configurableComponent.insert(person);
+ Thread.sleep(2000);//1秒后,新插入的对象会被加载
+ person = configurableComponent.queryConfigurable("person", "peronName");
+ assertTrue(person != null);
+
+ }
+
+ /**
+ * 创建configurable对象
+ *
+ * @param namespace
+ * @return
+ */
+ protected Person createPerson(String namespace) {
+ Person person = new Person();
+ person.setName("chris");
+ person.setAge(18);
+ person.setNameSpace(namespace);
+ person.setConfigureName("peronName");
+ person.setType("person");
+ return person;
+ }
+}
diff --git a/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java
new file mode 100644
index 0000000..06fd678
--- /dev/null
+++ b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+
+public class Person extends BasedConfigurable {
+ @ENVDependence
+ private String name;
+ private int age;
+ private Boolean isMale;
+ private List<String> addresses;
+ private Map<String, Integer> childName2Age;
+
+ public static Person createPerson(String namespace) {
+ Person person = new Person();
+ person.setNameSpace(namespace);
+ person.setType("person");
+ person.setConfigureName("Chris");
+ person.setName("Chris");
+ List<String> addresses = new ArrayList<>();
+ addresses.add("huilongguan");
+ addresses.add("shangdi");
+ person.setAddresses(addresses);
+ Map<String, Integer> childName2Age = new HashMap<>();
+ childName2Age.put("yuanyahan", 8);
+ childName2Age.put("yuanruxi", 4);
+ person.setChildName2Age(childName2Age);
+ person.setMale(true);
+ person.setAge(18);
+ return person;
+ }
+
+ @Override
+ public String toString() {
+ return "Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
+ + ", childName2Age=" + childName2Age + '}';
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ public Boolean getMale() {
+ return isMale;
+ }
+
+ public void setMale(Boolean male) {
+ isMale = male;
+ }
+
+ public List<String> getAddresses() {
+ return addresses;
+ }
+
+ public void setAddresses(List<String> addresses) {
+ this.addresses = addresses;
+ }
+
+ public Map<String, Integer> getChildName2Age() {
+ return childName2Age;
+ }
+
+ public void setChildName2Age(Map<String, Integer> childName2Age) {
+ this.childName2Age = childName2Age;
+ }
+
+ @Override
+ public Object clone() {
+ Person person = null;
+ try {
+ person = (Person)super.clone();
+ } catch (CloneNotSupportedException e) {
+ System.out.println("clone error " + e);
+ }
+ return person;
+ }
+}
diff --git a/rocketmq-streams-configurable/src/test/resources/log4j.xml b/rocketmq-streams-configurable/src/test/resources/log4j.xml
new file mode 100755
index 0000000..7812fe7
--- /dev/null
+++ b/rocketmq-streams-configurable/src/test/resources/log4j.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "http://toolkit.alibaba-inc.com/dtd/log4j/log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="Console" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{ISO8601} %l [%t] %-5p - %m%n%n"/>
+ </layout>
+ <filter class="org.apache.log4j.varia.LevelRangeFilter">
+ <param name="LevelMin" value="INFO"/>
+ <param name="LevelMax" value="ERROR"/>
+ </filter>
+ </appender>
+
+ <root>
+ <priority value="INFO"/>
+ <appender-ref ref="Console"/>
+ </root>
+
+</log4j:configuration>
\ No newline at end of file