You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/08/02 10:55:18 UTC

[rocketmq-streams] 05/15: add channel-configurable

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit 1992a3514e723f452b59878be8898d855eab2c7f
Author: vv <ze...@alibaba-inc.com>
AuthorDate: Mon Aug 2 12:04:58 2021 +0800

    add channel-configurable
---
 rocketmq-streams-configurable/pom.xml              |  21 +
 .../streams/configuable/ConfigurableComponent.java | 189 +++++++
 .../streams/configuable/model/Configure.java       | 130 +++++
 .../service/AbstractConfigurableService.java       | 561 +++++++++++++++++++++
 .../AbstractSupportParentConfigureService.java     | 263 ++++++++++
 .../service/ConfigurableServcieType.java           |  31 ++
 .../service/ConfigurableServiceFactory.java        |  58 +++
 .../service/impl/FileConfigureService.java         | 250 +++++++++
 .../impl/FileSupportParentConfigureService.java    |  38 ++
 .../service/impl/MemoryConfigureService.java       | 122 +++++
 .../impl/MemorySupportParentConfigureService.java  |  39 ++
 .../configurable/ConfigurableComponent.java        | 188 +++++++
 .../streams/configurable/model/Configure.java      | 127 +++++
 .../service/AbstractConfigurableService.java       | 553 ++++++++++++++++++++
 .../AbstractSupportParentConfigureService.java     | 255 ++++++++++
 .../service/ConfigurableServcieType.java           |  29 ++
 .../service/ConfigurableServiceFactory.java        |  59 +++
 .../service/impl/FileConfigureService.java         | 249 +++++++++
 .../impl/FileSupportParentConfigureService.java    |  37 ++
 .../service/impl/MemoryConfigureService.java       | 121 +++++
 .../impl/MemorySupportParentConfigureService.java  |  37 ++
 .../src/main/resources/log4j.xml                   |  20 +
 .../configuable/ConfiguableComponentTest.java      | 112 ++++
 .../rocketmq/streams/configuable/model/Person.java | 110 ++++
 .../configurable/ConfigurableComponentTest.java    | 108 ++++
 .../streams/configurable/model/Person.java         | 110 ++++
 .../src/test/resources/log4j.xml                   |  20 +
 27 files changed, 3837 insertions(+)

diff --git a/rocketmq-streams-configurable/pom.xml b/rocketmq-streams-configurable/pom.xml
new file mode 100755
index 0000000..3f837d8
--- /dev/null
+++ b/rocketmq-streams-configurable/pom.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="utf-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rocketmq</groupId>
+        <artifactId>rocketmq-streams</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>rocketmq-streams-configurable</artifactId>
+    <name>ROCKETMQ STREAMS :: configurable</name>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-streams-serviceloader</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/ConfigurableComponent.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/ConfigurableComponent.java
new file mode 100644
index 0000000..fbb35ca
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/ConfigurableComponent.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.component.ConfigureDescriptor;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
+import org.apache.rocketmq.streams.configuable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.configuable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.configuable.service.ConfigurableServiceFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * 对Configurable对象,做统一的管理,统一查询,插入和更新。 insert/update 把configuabel对象写入存储,支持文件存储(file),内存存储(memory)和db存储(DB)。可以在配置通过这个ConfigureFileKey.CONNECT_TYPE key 配置 query 是基于内存的查询,对象定时load到内存,可以在属性文件通过这个ConfigureFileKey.POLLING_TIME key配置加载周期,单位是秒 新对象加载后生效,已经存在的对象只有updateFlag发生变化才会被替换
+ */
+public class ConfigurableComponent extends AbstractComponent<IConfigurableService>
+    implements IConfigurableService {
+
+    private static final Log LOG = LogFactory.getLog(ConfigurableComponent.class);
+
+    protected volatile IConfigurableService configureService = null;
+
+    protected transient String namespace;
+
+    public ConfigurableComponent() {
+        initConfigurableServiceDescriptor();
+        addConfigureDescriptor(
+            new ConfigureDescriptor(CONNECT_TYPE, false, ConfigurableServcieType.DEFAULT_SERVICE_NAME));
+    }
+
+    public static ConfigurableComponent getInstance(String namespace) {
+        return ComponentCreator.getComponent(namespace, ConfigurableComponent.class);
+    }
+
+    @Override
+    protected boolean initProperties(Properties properties) {
+        try {
+            if (configureService != null) {
+                return true;
+            }
+            this.configureService = ConfigurableServiceFactory.createConfigurableService(properties);
+            return true;
+        } catch (Exception e) {
+            LOG.error("ConfigurableComponent create error,properties= " + properties, e);
+            return false;
+        }
+
+    }
+
+    @Override
+    public boolean startComponent(String namespace) {
+        try {
+            this.namespace = namespace;
+            configureService.initConfigurables(namespace);
+            return true;
+        } catch (Exception e) {
+            LOG.error("ConfigurableComponent init error, namespace is " + namespace, e);
+            return false;
+        }
+
+    }
+
+    /**
+     * 启动测试模式,用内存数据库存储和加载configurable数据
+     */
+    public static void begineTestMode() {
+        System.setProperty(ConfigurableComponent.CONNECT_TYPE, ConfigurableServcieType.MEMORY_SERVICE_NAME);
+    }
+
+    /**
+     * 关闭测试模式,用配置文件中配置的属性加载configuable数据
+     */
+    public static void endTestMode() {
+        System.clearProperty(ConfigurableComponent.CONNECT_TYPE);
+    }
+
+    @Override
+    public boolean stop() {
+        return true;
+    }
+
+    @Override
+    public IConfigurableService getService() {
+        return configureService;
+    }
+
+    @Override
+    public void initConfigurables(String namespace) {
+        configureService.initConfigurables(namespace);
+    }
+
+    @Override
+    public boolean refreshConfigurable(String namespace) {
+        return configureService.refreshConfigurable(namespace);
+    }
+
+    public void mockConfigurable(String namespace) {
+        refreshConfigurable(namespace);
+
+    }
+
+    @Override
+    public List<IConfigurable> queryConfigurable(String type) {
+        return configureService.queryConfigurable(type);
+    }
+
+    @Override
+    public <T extends IConfigurable> List<T> queryConfigurableByType(String type) {
+        return configureService.queryConfigurableByType(type);
+    }
+
+    @Override
+    public IConfigurable queryConfigurableByIdent(String type, String name) {
+        return configureService.queryConfigurableByIdent(type, name);
+    }
+
+    @Override
+    public IConfigurable queryConfigurableByIdent(String identification) {
+        return configureService.queryConfigurableByIdent(identification);
+    }
+
+    @Override
+    public void insert(IConfigurable configurable) {
+        configureService.insert(configurable);
+        ConfigurableUtil.refreshMock(configurable);
+    }
+
+    @Override
+    public void update(IConfigurable configurable) {
+        configureService.update(configurable);
+    }
+
+    @Override
+    public <T> Map<String, T> queryConfigurableMapByType(String type) {
+        return configureService.queryConfigurableMapByType(type);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T queryConfigurable(String configurableType, String name) {
+        return (T)queryConfigurableByIdent(configurableType, name);
+    }
+
+    //protected void insertConfigurable(JSONObject message, IConfigurable configurable) {
+    //    ConfigurableUtil.insertConfigurable(message, configurable, this.configureService);
+    //}
+
+    @Override
+    public String getNamespace() {
+        if (AbstractConfigurableService.class.isInstance(configureService)) {
+            return ((AbstractConfigurableService)configureService).getNamespace();
+        }
+        return namespace;
+    }
+
+    @Override
+    public Collection<IConfigurable> findAll() {
+        return configureService.findAll();
+    }
+
+    @Override
+    public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+        return configureService.loadConfigurableFromStorage(type);
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/model/Configure.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/model/Configure.java
new file mode 100644
index 0000000..121233d
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/model/Configure.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.model;
+
+import org.apache.rocketmq.streams.common.model.Entity;
+
+/**
+ *
+ * configuable如果存储在db,这个是db表的映射对象
+ */
+public class Configure extends Entity {
+
+    private static final long serialVersionUID = 5668017348345235669L;
+
+    private String nameSpace;
+    private String type;
+    private String name;
+    // private String identification;
+    private String jsonValue;
+    private String modifyTime;
+    private String remark;
+    private int openRange;
+
+
+    public static String createTableSQL(String tableName){
+        return "/******************************************/\n"
+                + "/*   TableName = dipper_configure   */\n"
+                + "/******************************************/\n"
+                + "CREATE TABLE IF NOT EXISTS `"+tableName+"` (\n"
+                + "  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',\n"
+                + "  `gmt_create` datetime NOT NULL COMMENT '创建时间',\n"
+                + "  `gmt_modified` datetime NOT NULL COMMENT '修改时间',\n"
+                + "  `namespace` varchar(32) NOT NULL COMMENT '项目标识',\n"
+                + "  `type` varchar(32) NOT NULL COMMENT '配置类型',\n"
+                + "  `name` varchar(128) NOT NULL COMMENT '配置名称',\n"
+                + "  `json_value` text NOT NULL COMMENT '配置内容',\n"
+                + "  `status` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '1:正在使用 0:已失效',\n"
+                + "  PRIMARY KEY (`id`),\n"
+                + "  UNIQUE KEY `uk_namespace_type_name` (`namespace`,`type`,`name`),\n"
+                + "  KEY `idx_namespace` (`namespace`)\n"
+                + ") ENGINE=InnoDB AUTO_INCREMENT=1814834 DEFAULT CHARSET=utf8 COMMENT='统一接入配置项'\n"
+                + ";";
+    }
+
+    public String getNameSpace() {
+        return nameSpace;
+    }
+
+    public void setNameSpace(String nameSpace) {
+        this.nameSpace = nameSpace;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    // public String getIdentification() {
+    // return identification;
+    // }
+
+    // public void createIdentification() {
+    // this.identification = MapKeyUtil.createKey(nameSpace, type, name);
+    // }
+
+    public String getJsonValue() {
+        return jsonValue;
+    }
+
+    public void setJsonValue(String jsonValue) {
+        this.jsonValue = jsonValue;
+    }
+
+    public String getModifyTime() {
+        return modifyTime;
+    }
+
+    public void setModifyTime(String modifyTime) {
+        this.modifyTime = modifyTime;
+    }
+
+    public String getRemark() {
+        return remark;
+    }
+
+    public void setRemark(String remark) {
+        this.remark = remark;
+    }
+
+    public int getOpenRange() {
+        return openRange;
+    }
+
+    public void setOpenRange(int openRange) {
+        this.openRange = openRange;
+    }
+
+    @Override
+    public String toString() {
+        return "Configure{" + "nameSpace='" + nameSpace + '\'' + ", type='" + type + '\'' + ", name='" + name + '\''
+                + ", jsonValue='" + jsonValue + '\'' + ", modifyTime='" + modifyTime + '\'' + ", remark='" + remark + '\''
+                + ", openRange=" + openRange + '}';
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractConfigurableService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractConfigurableService.java
new file mode 100644
index 0000000..9f55da8
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractConfigurableService.java
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service;
+
+import com.alibaba.fastjson.JSONObject;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.datatype.DataType;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
+import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.rocketmq.streams.configuable.model.Configure;
+
+public abstract class AbstractConfigurableService implements IConfigurableService {
+
+    private static final Log LOG = LogFactory.getLog(AbstractConfigurableService.class);
+
+    private static final String CLASS_NAME = IConfigurableService.CLASS_NAME;
+
+    protected Map<String, List<IConfigurable>> type2ConfigurableMap = new HashMap<>();
+
+    protected Map<String, IConfigurable> name2ConfigurableMap = new HashMap<>();
+
+    protected Map<String, IConfigurable> configurableMap = new HashMap<>();
+
+    protected Properties properties;
+
+    protected transient String namespace;
+
+    public AbstractConfigurableService(Properties properties) {
+        this.properties = properties;
+    }
+
+    public AbstractConfigurableService() {
+    }
+
+    @Override
+    public IConfigurable queryConfigurableByIdent(String identification) {
+        return name2ConfigurableMap.get(identification);
+    }
+
+    protected String getConfigureKey(String nameSpace, String type, String name) {
+        return MapKeyUtil.createKey(nameSpace, type, name);
+    }
+
+    protected void updateConfiguresCache(IConfigurable configurable) {
+        if (configurable == null) {
+            return;
+        }
+        configurable.toJson();
+        String key = getConfigureKey(configurable.getNameSpace(), configurable.getType(), configurable.getConfigureName());
+        configurableMap.put(key, configurable);
+    }
+
+    protected void updateConfiguresCache(List<IConfigurable> configureList) {
+        for (IConfigurable iConfigurable : configureList) {
+            updateConfiguresCache(iConfigurable);
+        }
+    }
+
+    protected boolean equals(String key, List<?> newConfigureList) {
+        for (Object o : newConfigureList) {
+            IConfigurable configure = (IConfigurable)o;
+            String tempKey = getConfigureKey(configure.getNameSpace(), configure.getType(), configure.getConfigureName());
+            if (key.equals(tempKey)) {
+                IConfigurable oldConfigure = configurableMap.get(key);
+                if (oldConfigure == null) {
+                    continue;
+                }
+                return ConfigurableUtil.compare(oldConfigure, configure);
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public <T extends IConfigurable> List<T> queryConfigurableByType(String type) {
+        List<IConfigurable> list = queryConfigurable(type);
+        if (list == null) {
+            return new ArrayList<T>();
+        }
+        List<T> result = new ArrayList<T>();
+        for (IConfigurable configurable : list) {
+            result.add((T)configurable);
+        }
+        return result;
+    }
+
+    @Override
+    public boolean refreshConfigurable(String namespace) {
+        //每次刷新,重新刷新配置文件
+        //if(ComponentCreator.propertiesPath!=null){
+        //    ComponentCreator.setProperties(ComponentCreator.propertiesPath);
+        //}
+        this.namespace = namespace;
+        // Map<String, List<IConfigurable>> namespace2ConfigurableMap = new HashMap<>();
+        Map<String, List<IConfigurable>> tempType2ConfigurableMap = new HashMap<>();
+        Map<String, IConfigurable> tempName2ConfigurableMap = new HashMap<>();
+        GetConfigureResult configures = loadConfigurable(namespace);
+        // updateConfiguresCache(configures.getConfigure());
+        if (configures != null && configures.isQuerySuccess() && configures.getConfigurables() != null) {
+            // List<Configure> configureList = filterConfigure(configures.getConfigure());
+            List<IConfigurable> configurables = configures.getConfigurables();
+            List<IConfigurable> configurableList = checkAndUpdateConfigurables(namespace, configurables,
+                tempType2ConfigurableMap, tempName2ConfigurableMap,
+                configures.getConfigurables());
+            // this.namespace2ConfigurableMap = namespace2ConfigurableMap;
+            for (IConfigurable configurable : configurableList) {
+                if (configurable instanceof IAfterConfiguableRefreshListerner) {
+                    ((IAfterConfiguableRefreshListerner)configurable).doProcessAfterRefreshConfigurable(this);
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public <T> T queryConfigurable(String configurableType, String name) {
+        return (T)queryConfigurableByIdent(configurableType, name);
+    }
+
+    protected List<IConfigurable> checkAndUpdateConfigurables(String namespace, List<IConfigurable> configurables,
+                                                              Map<String, List<IConfigurable>> tempType2ConfigurableMap,
+                                                              Map<String, IConfigurable> tempName2ConfigurableMap,
+                                                              List configures) {
+        List<IConfigurable> configurableList = new ArrayList<>();
+        for (IConfigurable configurable : configurables) {
+            try {
+                boolean isUpdate = update(configurable, tempName2ConfigurableMap, tempType2ConfigurableMap);
+                if (isUpdate) {
+                    configurableList.add(configurable);
+                }
+            } catch (Exception e) {
+                LOG.error("组件初始化异常:" + e.getMessage() + ",name=" + configurable.getConfigureName(), e);
+            }
+        }
+        destroyOldConfigurables(tempName2ConfigurableMap);
+        this.name2ConfigurableMap = tempName2ConfigurableMap;
+        this.type2ConfigurableMap = tempType2ConfigurableMap;
+        return configurableList;
+    }
+
+    private void destroyOldConfigurables(Map<String, IConfigurable> tempName2ConfigurableMap) {
+        Iterator<Map.Entry<String, IConfigurable>> it = this.name2ConfigurableMap.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, IConfigurable> entry = it.next();
+            String key = entry.getKey();
+            IConfigurable value = entry.getValue();
+            if (!tempName2ConfigurableMap.containsKey(key)) {
+                destroyOldConfigurable(value);
+            }
+        }
+
+    }
+
+    private void destroyOldConfigurable(IConfigurable oldConfigurable) {
+        if (AbstractConfigurable.class.isInstance(oldConfigurable)) {
+            ((AbstractConfigurable)oldConfigurable).destroy();
+        }
+        String key = getConfigureKey(oldConfigurable.getNameSpace(), oldConfigurable.getType(),
+            oldConfigurable.getConfigureName());
+        configurableMap.remove(key);
+    }
+
+    protected void initConfigurable(IConfigurable configurable) {
+        if (AbstractConfigurable.class.isInstance(configurable)) {
+            AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
+            abstractConfigurable.setConfigurableService(this);
+        }
+
+        configurable.init();
+
+    }
+
+    /**
+     * 内部使用
+     */
+    private ScheduledExecutorService scheduledExecutorService;
+
+    @Override
+    public void initConfigurables(final String namespace) {
+        refreshConfigurable(namespace);
+        long polingTime = -1;
+        if (this.properties != null) {
+            String pollingTimeStr = this.properties.getProperty(AbstractComponent.POLLING_TIME);
+            if (StringUtil.isNotEmpty(pollingTimeStr)) {
+                polingTime = Long.valueOf(pollingTimeStr);
+            }
+        }
+        if (polingTime > 0) {
+            scheduledExecutorService = new ScheduledThreadPoolExecutor(3);
+            scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        refreshConfigurable(namespace);
+                    } catch (Exception e) {
+                        LOG.error("Load configurables error:" + e.getMessage(), e);
+                    }
+                }
+            }, polingTime, polingTime, TimeUnit.SECONDS);
+        }
+    }
+    // @Override
+    // public List<IConfigurable> queryConfigurable(String nameSpace) {
+    // return namespace2ConfigurableMap.get(nameSpace);
+    // }
+
+    @Override
+    public List<IConfigurable> queryConfigurable(String type) {
+        String key = MapKeyUtil.createKey(type);
+        return type2ConfigurableMap.get(key);
+    }
+
+    @Override
+    public IConfigurable queryConfigurableByIdent(String type, String name) {
+        String key = MapKeyUtil.createKey(type, name);
+        return name2ConfigurableMap.get(key);
+    }
+
+    /**
+     * 根据namespace加载配置信息
+     *
+     * @param namespace
+     * @return
+     * @throws Exception
+     */
+    protected abstract GetConfigureResult loadConfigurable(String namespace);
+
+    @Override
+    public void update(IConfigurable configurable) {
+        // update(configurable,name2ConfigurableMap,type2ConfigurableMap);
+        updateConfigurable(configurable);
+    }
+
+    protected abstract void updateConfigurable(IConfigurable configurable);
+
+    protected abstract void insertConfigurable(IConfigurable configurable);
+
+    protected boolean update(IConfigurable configurable, Map<String, IConfigurable> name2ConfigurableMap,
+                             Map<String, List<IConfigurable>> type2ConfigurableMap) {
+        if (configurable == null) {
+            return false;
+        }
+
+        boolean isUpdate = false;
+        List<IConfigurable> configurableList = new ArrayList<>();
+        configurableList.add(configurable);
+
+        String nameKey = MapKeyUtil.createKey(configurable.getType(), configurable.getConfigureName());
+        if (this.name2ConfigurableMap.containsKey(nameKey)) {
+            String configureKey = getConfigureKey(namespace, configurable.getType(), configurable.getConfigureName());
+            IConfigurable oldConfigurable = this.name2ConfigurableMap.get(nameKey);
+            if (equals(configureKey, configurableList)) {
+                configurable = oldConfigurable;
+                // name2ConfigurableMap.put(nameKey, name2ConfigurableMap.get(nameKey));
+            } else {
+                destroyOldConfigurable(oldConfigurable);
+                initConfigurable(configurable);
+                isUpdate = true;
+            }
+        } else {
+            initConfigurable(configurable);
+            isUpdate = true;
+        }
+        updateConfiguresCache(configurable);
+        name2ConfigurableMap.put(nameKey, configurable);
+        String typeKey = MapKeyUtil.createKey(configurable.getType());
+        // put2Map(namespace2ConfigurableMap, namespace, configurable);
+        put2Map(type2ConfigurableMap, typeKey, configurable);
+        return isUpdate;
+    }
+
+    @Override
+    public void insert(IConfigurable configurable) {
+        // update(configurable,name2ConfigurableMap,type2ConfigurableMap);
+        insertConfigurable(configurable);
+    }
+
+    /**
+     * 给一个扣,可以跨命名空间查询数据
+     *
+     * @param namespaces
+     * @return
+     */
+    public List<IConfigurable> queryConfiguableByNamespace(String... namespaces) {
+        List<IConfigurable> configurables = new ArrayList<>();
+        if (namespaces == null || namespaces.length == 0) {
+            return configurables;
+        }
+        for (String namespace : namespaces) {
+            GetConfigureResult result = loadConfigurable(namespace);
+            if (result.querySuccess) {
+                if (result.configurables != null && result.configurables.size() > 0) {
+                    configurables.addAll(result.configurables);
+                }
+            } else {
+                throw new RuntimeException("Load configurable error, the namespace is " + namespace);
+            }
+        }
+        return configurables;
+
+    }
+
+    /**
+     * 往一个value是list的map中添加数据,如果list是空创建,否则直接插入
+     *
+     * @param map
+     * @param key
+     * @param configurable
+     */
+    protected void put2Map(Map<String, List<IConfigurable>> map, String key, IConfigurable configurable) {
+        List<IConfigurable> list = map.computeIfAbsent(key, k -> new ArrayList<IConfigurable>());
+        list.add(configurable);
+    }
+
+    @Override
+    public Collection<IConfigurable> findAll() {
+        return name2ConfigurableMap.values();
+    }
+
+    /**
+     * 把configurable转换成configure
+     *
+     * @param configurable
+     * @return
+     */
+    protected Configure createConfigure(IConfigurable configurable) {
+        Configure configure = new Configure();
+        configure.setType(configurable.getType());
+        configure.setName(configurable.getConfigureName());
+        configure.setNameSpace(configurable.getNameSpace());
+        String jsonString = configurable.toJson();
+        if (!StringUtil.isEmpty(jsonString)) {
+            JSONObject jsonObject = JSONObject.parseObject(jsonString);
+            jsonObject.put(CLASS_NAME, configurable.getClass().getName());
+            configure.setJsonValue(jsonObject.toJSONString());
+        }
+        // configure.createIdentification();
+        return configure;
+    }
+
+    @Override
+    public <T> Map<String, T> queryConfigurableMapByType(String type) {
+        List<IConfigurable> configurables = queryConfigurable(type);
+        if (configurables == null) {
+            return new HashMap<String, T>();
+        }
+        Map<String, T> result = new HashMap<String, T>();
+        for (IConfigurable configurable : configurables) {
+            result.put(configurable.getConfigureName(), (T)configurable);
+        }
+        return result;
+    }
+
+    /**
+     * 把configure转换成configurable
+     *
+     * @param configures
+     * @return
+     */
+    protected List<IConfigurable> convert(List<Configure> configures) {
+        if (configures == null) {
+            return new ArrayList<IConfigurable>();
+        }
+        List<IConfigurable> configurables = new ArrayList<IConfigurable>();
+        for (Configure configure : configures) {
+            IConfigurable configurable = convert(configure);
+            if (configurable != null) {
+                configurables.add(configurable);
+            }
+
+        }
+        return configurables;
+    }
+
+    protected IConfigurable createConfigurableFromJson(String namespace, String type, String name, String jsonValue) {
+        if (StringUtil.isEmpty(jsonValue)) {
+            return null;
+        }
+        JSONObject jsonObject = JSONObject.parseObject(jsonValue);
+        String className = jsonObject.getString(CLASS_NAME);
+        IConfigurable configurable = createConfigurable(className);
+        if (configurable == null) {
+            return null;
+        }
+        configurable.setConfigureName(name);
+        configurable.setNameSpace(namespace);
+        configurable.setType(type);
+        if (AbstractConfigurable.class.isInstance(configurable)) {
+            AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
+            abstractConfigurable.setConfigurableService(this);
+        }
+        configurable.toObject(jsonValue);
+        return configurable;
+    }
+
+    /**
+     * 提供一个入口,可以让外部用户改变configure对应的configurable的值
+     *
+     * @param configure
+     * @return
+     */
+    protected IConfigurable convert(Configure configure) {
+
+        return convertConfigurable(configure);
+    }
+
+    protected IConfigurable convertConfigurable(Configure configure) {
+        String className = null;
+        try {
+            String jsonString = configure.getJsonValue();
+            IConfigurable configurable =
+                createConfigurableFromJson(configure.getNameSpace(), configure.getType(), configure.getName(),
+                    jsonString);
+            if (configurable instanceof Entity) {
+                // add by wangtl 20171110 Configurable接口第三方包也在用,故不能Configurable里加接口,只能加到抽象类里,这里强转下
+                Entity abs = (Entity)configurable;
+                abs.setId(configure.getId());
+                abs.setGmtCreate(configure.getGmtCreate());
+                abs.setGmtModified(configure.getGmtModified());
+                /*
+                 * abs.setTempKey((configurable.getNameSpace() + configurable.getType() +
+                 * configurable.getConfigureName() + jsonString).hashCode());
+                 */
+            }
+            convertPost(configurable);
+            return configurable;
+        } catch (Exception e) {
+            LOG.error("转换异常:" + configure.toString(), e);
+            return null;
+        }
+    }
+
+    /**
+     * 如果需要改变configurable的属性,可以再这里设置
+     *
+     * @param configurable
+     */
+    @SuppressWarnings("rawtypes")
+    protected void convertPost(IConfigurable configurable) {
+        if (this.properties == null) {
+            return;
+        }
+        String identification =
+            MapKeyUtil.createKey(configurable.getNameSpace(), configurable.getType(), configurable.getConfigureName());
+        String propertyValue = this.properties.getProperty(identification);
+        if (StringUtil.isEmpty(propertyValue)) {
+            return;
+        }
+        String[] fieldName2Values = propertyValue.split(",");
+        if (fieldName2Values.length == 0) {
+            return;
+        }
+        for (String fieldName2Value : fieldName2Values) {
+            try {
+                String[] fieldName2ValueArray = fieldName2Value.split(":");
+                if (fieldName2ValueArray.length != 2) {
+                    continue;
+                }
+                String fieldName = fieldName2ValueArray[0];
+                String value = fieldName2ValueArray[1];
+                Class clazz = ReflectUtil.getBeanFieldType(configurable.getClass(), fieldName);
+                DataType dataType = DataTypeUtil.createDataType(clazz, null);
+                if (dataType == null) {
+                    continue;
+                }
+                Object fieldValue = dataType.getData(value);
+                ReflectUtil.setBeanFieldValue(configurable, fieldName, fieldValue);
+
+            } catch (Exception e) {
+                LOG.error("convert post error " + fieldName2Value, e);
+                continue;
+            }
+
+        }
+    }
+
+    /**
+     * 创建configurable对象
+     *
+     * @param className class name
+     * @return
+     */
+    @SuppressWarnings("rawtypes")
+    protected IConfigurable createConfigurable(String className) {
+        return ReflectUtil.forInstance(className);
+    }
+
+    public class GetConfigureResult {
+
+        private boolean querySuccess;
+        private List<IConfigurable> configurables;
+
+        public boolean isQuerySuccess() {
+            return querySuccess;
+        }
+
+        public void setQuerySuccess(boolean querySuccess) {
+            this.querySuccess = querySuccess;
+        }
+
+        public List<IConfigurable> getConfigurables() {
+            return configurables;
+        }
+
+        public void setConfigurables(List<IConfigurable> configurables) {
+            this.configurables = configurables;
+        }
+    }
+
+    @Override
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractSupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractSupportParentConfigureService.java
new file mode 100644
index 0000000..390a119
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/AbstractSupportParentConfigureService.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableListerner;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * namespace 分层,支持顶级命名空间,顶级命名空间的对象,所有命名空间都可见。顶级命名空间是固定值IConfigurableService.PARENT_CHANNEL_NAME_SPACE
+ */
+public abstract class AbstractSupportParentConfigureService extends AbstractConfigurableService
+        implements IConfigurableService {
+
+    private static final Log LOG = LogFactory.getLog(AbstractSupportParentConfigureService.class);
+    protected IConfigurableService configureService = null;
+    protected IConfigurableService parentConfigureService = null;
+    //protected IConfigurableService shareConfigureService = null;
+    protected Properties properties;
+
+    public AbstractSupportParentConfigureService() {
+        super(null);
+    }
+
+    public void initMethod(Properties property) {
+        this.properties = property;
+        initBeforeInitConfigurable(property);
+    }
+
+    protected abstract void initBeforeInitConfigurable(Properties property);
+
+
+
+
+    @Override
+    public void initConfigurables(String namespace) {
+
+        if (!IConfigurableService.PARENT_CHANNEL_NAME_SPACE.equals(namespace)) {
+            parentConfigureService.initConfigurables(IConfigurableService.PARENT_CHANNEL_NAME_SPACE);
+        } else {
+            parentConfigureService = null;
+        }
+        configureService.initConfigurables(namespace);
+    }
+
+    @Override
+    public boolean refreshConfigurable(String namespace) {
+
+        if (!IConfigurableService.PARENT_CHANNEL_NAME_SPACE.equals(namespace)) {
+            parentConfigureService.refreshConfigurable(IConfigurableService.PARENT_CHANNEL_NAME_SPACE);
+            // initShareConfigurableService(namespace);
+        }
+        configureService.refreshConfigurable(namespace);
+        return true;
+    }
+
+
+
+    @Override
+    public List<IConfigurable> queryConfigurable(String type) {
+        List<IConfigurable> result = configureService.queryConfigurable(type);
+        if (result == null) {
+            result = new ArrayList<>();
+        }
+        //if (shareConfigureService != null) {
+        //    List<IConfigurable> share = shareConfigureService.queryConfigurable(type);
+        //    if (share != null) {
+        //        result.addAll(share);
+        //    }
+        //}
+        if (parentConfigureService == null) {
+            return result;
+        }
+        List<IConfigurable> parent = parentConfigureService.queryConfigurable(type);
+        if (parent != null) {
+            result.addAll(parent);
+        }
+        return result;
+    }
+
+    @Override
+    public IConfigurable queryConfigurableByIdent(String type, String name) {
+        IConfigurable configurable = configureService.queryConfigurableByIdent(type, name);
+        if (configurable != null) {
+            return configurable;
+        }
+        if (parentConfigureService == null) {
+            return null;
+        }
+        //if (shareConfigureService != null) {
+        //    configurable = shareConfigureService.queryConfigurableByIdent(type, name);
+        //}
+        if (configurable != null) {
+            return configurable;
+        }
+        return parentConfigureService.queryConfigurableByIdent(type, name);
+    }
+
+    @Override
+    public IConfigurable queryConfigurableByIdent(String identification) {
+        IConfigurable configurable = configureService.queryConfigurableByIdent(identification);
+        if (configurable != null) {
+            return configurable;
+        }
+        if (parentConfigureService == null) {
+            return null;
+        }
+        //if (shareConfigureService != null) {
+        //    configurable = shareConfigureService.queryConfigurableByIdent(identification);
+        //}
+        if (configurable != null) {
+            return configurable;
+        }
+        return parentConfigureService.queryConfigurableByIdent(identification);
+    }
+
+    @Override
+    protected void insertConfigurable(IConfigurable configurable) {
+        if (parentConfigureService != null && configurable.getNameSpace()
+                .equals(IConfigurableService.PARENT_CHANNEL_NAME_SPACE)) {
+            parentConfigureService.insert(configurable);
+        } else {
+            configureService.insert(configurable);
+        }
+    }
+
+    @Override
+    protected void updateConfigurable(IConfigurable configurable) {
+        if (parentConfigureService != null && configurable.getNameSpace()
+                .equals(IConfigurableService.PARENT_CHANNEL_NAME_SPACE)) {
+            parentConfigureService.update(configurable);
+        } else {
+            configureService.update(configurable);
+        }
+    }
+
+    @Override
+    public <T> T queryConfigurable(String configurableType, String name) {
+        return (T) queryConfigurableByIdent(configurableType, name);
+    }
+
+    @Override
+    protected GetConfigureResult loadConfigurable(String namespace) {
+        return null;
+    }
+
+    //protected void initShareConfigurableService(String namespace) {
+    //    if (parentConfigureService == null) {
+    //        return;
+    //    }
+    //    shareConfigureService = new AbstractReadOnlyConfigurableService() {
+    //
+    //        @Override
+    //        public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+    //            refreshConfigurable(namespace);
+    //            return queryConfigurableByType(type);
+    //        }
+    //
+    //        @Override
+    //        protected List<IConfigurable> loadConfigurables(String namespace) {
+    //            List<IConfigurable> parent = parentConfigureService.queryConfigurable(ShareConfiguable.TYPE);
+    //            List<IConfigurable> shareConfigurables = new ArrayList<>();
+    //            if (parent == null) {
+    //                return shareConfigurables;
+    //            }
+    //            for (IConfigurable configurable : parent) {
+    //                ShareConfiguable shareConfiguable = (ShareConfiguable) configurable;
+    //                if (shareConfiguable.getShareAll() || shareConfiguable.getShareNameSpaces().contains(namespace)) {
+    //                    String sharedNameSpace = shareConfiguable.getSharedNameSpace();
+    //                    String sharedType = shareConfiguable.getSharedType();
+    //                    String sharedName = shareConfiguable.getSharedName();
+    //                    List<IConfigurable> sharedConfigrables =
+    //                        createAndQueryConfigurable(sharedNameSpace, sharedType, sharedName);
+    //                    if (sharedConfigrables != null) {
+    //                        shareConfigurables.addAll(sharedConfigrables);
+    //                    }
+    //                }
+    //            }
+    //            return shareConfigurables;
+    //        }
+    //
+    //
+    //    };
+    //    shareConfigureService.refreshConfigurable(namespace);
+    //
+    //}
+
+    protected List<IConfigurable> createAndQueryConfigurable(String sharedNameSpace, String sharedType,
+                                                             String sharedName) {
+        IConfigurableService innerSharedConfigurableService =
+                ConfigurableServiceFactory.createConfigurableService(properties);
+        innerSharedConfigurableService.refreshConfigurable(sharedNameSpace);
+        if (StringUtil.isNotEmpty(sharedName)) {
+            List<IConfigurable> configurables = new ArrayList<>();
+            IConfigurable configurable = innerSharedConfigurableService.queryConfigurableByIdent(sharedType, sharedName);
+            configurables.add(configurable);
+            return configurables;
+        } else {
+            return innerSharedConfigurableService.queryConfigurable(sharedType);
+        }
+
+    }
+
+
+    @Override
+    public Collection<IConfigurable> findAll() {
+        List<IConfigurable> configurables=new ArrayList<>();
+        if (parentConfigureService != null ) {
+            Collection<IConfigurable> tmp=parentConfigureService.findAll();
+            if(tmp!=null||tmp.size()>0){
+                configurables.addAll(tmp);
+            }
+        }
+        Collection<IConfigurable> tmp=configureService.findAll();
+        if(tmp!=null||tmp.size()>0){
+            configurables.addAll(tmp);
+        }
+        return configurables;
+    }
+
+    public IConfigurableService getConfigureService() {
+        return configureService;
+    }
+
+    @Override
+    public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+        List<T> configurables=new ArrayList<>();
+        if (parentConfigureService != null ) {
+            Collection<T> tmp=parentConfigureService.loadConfigurableFromStorage(type);
+            if(tmp!=null||tmp.size()>0){
+                configurables.addAll(tmp);
+            }
+        }
+        Collection<T> tmp=configureService.loadConfigurableFromStorage(type);
+        if(tmp!=null||tmp.size()>0){
+            configurables.addAll(tmp);
+        }
+        return configurables;
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/ConfigurableServcieType.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/ConfigurableServcieType.java
new file mode 100644
index 0000000..f2b72ab
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/ConfigurableServcieType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+
+@Deprecated
+public class ConfigurableServcieType {
+
+    public static final String DEFAULT_SERVICE_NAME = IConfigurableService.DEFAULT_SERVICE_NAME;
+    public static final String MEMORY_SERVICE_NAME = IConfigurableService.MEMORY_SERVICE_NAME;
+    public static final String FILE_SERVICE_NAME = IConfigurableService.FILE_SERVICE_NAME;
+    public static final String HTTP_SERVICE_NAME = IConfigurableService.HTTP_SERVICE_NAME;
+
+
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/ConfigurableServiceFactory.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/ConfigurableServiceFactory.java
new file mode 100644
index 0000000..307dcae
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/ConfigurableServiceFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;
+
+public class ConfigurableServiceFactory {
+    private static final ServiceLoaderComponent<IConfigurableService> configurableServiceLoaderComponent = ServiceLoaderComponent.getInstance(IConfigurableService.class);
+    private static final Log LOG = LogFactory.getLog(ConfigurableServiceFactory.class);
+
+    public static IConfigurableService createConfigurableService(Properties properties) {
+        try {
+            Properties properties1 = new Properties();
+            properties1.putAll(properties);
+            String type = properties1.getProperty(ConfigureFileKey.CONNECT_TYPE);
+            if (StringUtil.isEmpty(type)) {
+                type = IConfigurableService.DEFAULT_SERVICE_NAME;
+            }
+            IConfigurableService configurableService = getConfigurableServiceType(type);
+            if (configurableService instanceof AbstractSupportParentConfigureService) {
+                ((AbstractSupportParentConfigureService)configurableService).initMethod(properties1);
+            }
+            return configurableService;
+        } catch (Exception e) {
+            LOG.error("create ConfigurableService error", e);
+            return null;
+        }
+
+    }
+
+    public static IConfigurableService getConfigurableServiceType(String type) {
+        IConfigurableService configurableService = configurableServiceLoaderComponent.getService().loadService(type);
+        return ReflectUtil.forInstance(configurableService.getClass().getName());
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/FileConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/FileConfigureService.java
new file mode 100644
index 0000000..d251bda
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/FileConfigureService.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service.impl;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configuable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.configuable.model.Configure;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.common.utils.DateUtil;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class FileConfigureService extends AbstractConfigurableService {
+
+    public static final String FILE_PATH_NAME = IConfigurableService.FILE_PATH_NAME;
+    // 配置文件的路径
+    private static final Log LOG = LogFactory.getLog(FileConfigureService.class);
+    private static final String DEFAULT_FILE_NAME = "dipper_configure.cs";                        // 默认文件名
+    private static final String SIGN = "&&&&";                                       // 字段分割附号
+    public String fileName;
+
+    public FileConfigureService(Properties properties) {
+        super(properties);
+        initService(properties.getProperty(FILE_PATH_NAME));
+    }
+
+    protected void initService(String fileAndPath) {
+        if (StringUtil.isEmpty(fileAndPath)) {
+            String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+            if (path.endsWith(".jar")) {
+                int index = path.lastIndexOf(File.separator);
+                path = path.substring(0, index);
+            }
+            fileName = FileUtil.concatFilePath(path, DEFAULT_FILE_NAME);
+        } else {
+            fileName = fileAndPath;
+        }
+        LOG.info("load file from path = " + fileName);
+    }
+
+    @Override
+    protected GetConfigureResult loadConfigurable(String namespace) {
+        GetConfigureResult result = new GetConfigureResult();
+        try {
+            List<Configure> configures = selectOpening(namespace);
+            List<IConfigurable> configurables = convert(configures);
+            LOG.info("load configure namespace=" + namespace + " count=" + configures.size());
+            result.setConfigurables(configurables);
+            result.setQuerySuccess(true);// 该字段标示查询是否成功,若不成功则不会更新配置
+        } catch (Exception e) {
+            result.setQuerySuccess(false);
+            e.printStackTrace();
+            LOG.error("load configurable error ", e);
+        }
+        return result;
+    }
+
+    protected List<Configure> selectOpening(String namespace) {
+        List<String> list = loadFileLine(fileName);
+        List<Configure> configures = convert2Configure(list);
+        return filter(configures, namespace);
+    }
+
+    protected List<Configure> filter(List<Configure> configures, String namespace) {
+        if (configures == null) {
+            return new ArrayList<>();
+        }
+        if (StringUtil.isEmpty(namespace)) {
+            throw new RuntimeException("namespace can not empty ");
+        }
+        List<Configure> filterConfigures = new ArrayList<>();
+        for (Configure configure : configures) {
+            if (!namespace.equals(configure.getNameSpace())) {
+                continue;
+            }
+            filterConfigures.add(configure);
+        }
+        return filterConfigures;
+    }
+
+    @Override
+    protected void insertConfigurable(IConfigurable configure) {
+        if (configure == null) {
+            LOG.warn("insert configure is null");
+            return;
+        }
+        String row = configure2String(configure);
+
+        List<String> rows = loadFileLine(fileName);
+        if (rows == null) {
+            rows = new ArrayList<>();
+        }
+        List<Configure> configures = convert2Configure(rows);
+        String newKey =
+                MapKeyUtil.createKey(configure.getNameSpace(), configure.getType(), configure.getConfigureName());
+        boolean isReplace = false;
+        for (int i = 0; i < configures.size(); i++) {
+            Configure c = configures.get(i);
+            String old = MapKeyUtil.createKey(c.getNameSpace(), c.getType(), c.getName());
+            if (old.equals(newKey)) {
+                rows.set(i, configure2String(configure));
+                isReplace = true;
+                break;
+            }
+        }
+        if (!isReplace) {
+            rows.add(configure2String(configure));
+        }
+        writeFile(fileName, rows);
+    }
+
+    @Override
+    protected void updateConfigurable(IConfigurable configure) {
+        if (configure == null) {
+            LOG.warn("insert configure is null");
+            return;
+        }
+
+        List<String> rows = FileUtil.loadFileLine(fileName);
+        if (rows == null) {
+            rows = new ArrayList<>();
+        }
+        for (int i = 0; i < rows.size(); i++) {
+            String row = rows.get(i);
+            Configure oldConfigure = convert(row);
+            if (configure.getNameSpace().equals(oldConfigure.getNameSpace()) && configure.getType()
+                    .equals(oldConfigure.getType()) && configure.getConfigureName().equals(oldConfigure.getName())) {
+                rows.set(i, configure2String(configure));
+            }
+        }
+        writeFile(fileName, rows);
+
+    }
+
+    protected Configure convert(String row) {
+        String[] values = row.split(SIGN);
+        String namespace = getColumnValue(values, 0, "namespace");
+        String type = getColumnValue(values, 1, "type");
+        String name = getColumnValue(values, 2, "name");
+        String jsonValue = getColumnValue(values, 3, "json_value");
+        String createDate = getColumnValue(values, 4, "gmt_create");
+        String modifiedDate = getColumnValue(values, 5, "gmt_modified");
+        String id = getColumnValue(values, 6, "id");
+        Configure configure = new Configure();
+        configure.setNameSpace(namespace);
+        configure.setType(type);
+        configure.setName(name);
+        configure.setJsonValue(jsonValue);
+        configure.setGmtCreate(DateUtil.parse(createDate));
+        configure.setGmtCreate(DateUtil.parse(modifiedDate));
+        configure.setId((id == null ? null : Long.valueOf(id)));
+
+        return configure;
+    }
+
+    protected List<Configure> convert2Configure(List<String> rows) {
+        List<Configure> configures = new ArrayList<Configure>();
+        for (String row : rows) {
+            configures.add(convert(row));
+        }
+        return configures;
+    }
+
+    protected String getColumnValue(String[] values, int i, String namespace) {
+        if (values == null || values.length == 0) {
+            return null;
+        }
+        if (values.length <= i) {
+            return null;
+        }
+        if ("null".equals(values[i])) {
+            return null;
+        }
+        return values[i];
+    }
+
+    /**
+     * 解密文件,并加载到内存
+     *
+     * @param fileName
+     * @return
+     */
+    protected List<String> loadFileLine(String fileName) {
+        List<String> rows = FileUtil.loadFileLine(fileName);
+        if (rows == null) {
+            rows = new ArrayList<>();
+        }
+        return doDecRowList(rows);
+    }
+
+    protected void writeFile(String fileName, List<String> rows) {
+        List<String> rowList = doEncryptRowList(rows);
+        FileUtil.write(fileName, rowList);
+    }
+
+    private List<String> doEncryptRowList(List<String> rows) {
+        return rows;
+    }
+
+    private List<String> doDecRowList(List<String> rows) {
+        return rows;
+    }
+
+    protected String configure2String(IConfigurable configure) {
+        Entity entity = null;
+        if (configure instanceof Entity) {
+            entity = (Entity)configure;
+        } else {
+            entity = new Entity();
+        }
+        String row = MapKeyUtil.createKeyBySign(SIGN, configure.getNameSpace(), configure.getType(),
+                configure.getConfigureName(), configure.toJson(), DateUtil.format(entity.getGmtCreate()),
+                DateUtil.format(entity.getGmtModified()), entity.getId() + "");
+        return row;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    @Override
+    public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+        refreshConfigurable(getNamespace());
+        return queryConfigurableByType(type);
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/FileSupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/FileSupportParentConfigureService.java
new file mode 100644
index 0000000..bf8d151
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/FileSupportParentConfigureService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service.impl;
+
+import org.apache.rocketmq.streams.configuable.service.AbstractSupportParentConfigureService;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configuable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import com.google.auto.service.AutoService;
+
+import java.util.Properties;
+
+@AutoService(IConfigurableService.class)
+@ServiceName(ConfigurableServcieType.FILE_SERVICE_NAME)
+public class FileSupportParentConfigureService extends AbstractSupportParentConfigureService {
+
+    @Override
+    protected void initBeforeInitConfigurable(Properties property) {
+        this.configureService = new FileConfigureService(properties);
+        this.parentConfigureService = new FileConfigureService(properties);
+    }
+
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/MemoryConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/MemoryConfigureService.java
new file mode 100644
index 0000000..e846e44
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/MemoryConfigureService.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.configuable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+
+public class MemoryConfigureService extends AbstractConfigurableService {
+
+    private static Map<String, List<IConfigurable>> namespace2Configure = new HashMap<>();
+
+    public MemoryConfigureService(Properties properties) {
+        super(properties);
+    }
+
+    @Override
+    protected GetConfigureResult loadConfigurable(String namespace) {
+        GetConfigureResult result = new GetConfigureResult();
+        result.setQuerySuccess(true);
+        List<IConfigurable> configurableList = new ArrayList<>();
+        List<IConfigurable> configurables = namespace2Configure.get(namespace);
+        if (configurables == null) {
+            configurableList = null;
+        } else {
+            List<IConfigurable> tmps = new ArrayList<>();
+            tmps.addAll(configurables);
+            for (IConfigurable configurable : tmps) {
+                IConfigurable tmp = ReflectUtil.forInstance(configurable.getClass());
+                tmp.toObject(configurable.toJson());
+                tmp.setNameSpace(configurable.getNameSpace());
+                tmp.setConfigureName(configurable.getConfigureName());
+                configurableList.add(tmp);
+            }
+        }
+        result.setConfigurables(configurableList);
+        return result;
+    }
+
+    @Override
+    protected void insertConfigurable(IConfigurable configurable) {
+        if (configurable == null) {
+            return;
+        }
+
+        String namespace = configurable.getNameSpace();
+        List<IConfigurable> list = namespace2Configure.get(namespace);
+        if (list == null) {
+            synchronized (this) {
+                list = namespace2Configure.get(namespace);
+                if (list == null) {
+                    list = new ArrayList<>();
+                    namespace2Configure.put(namespace, list);
+                }
+            }
+        }
+        int removeIndex = -1;
+        for (int i = 0; i < list.size(); i++) {
+            IConfigurable config = list.get(i);
+            if (config.getType().equals(configurable.getType()) && config.getConfigureName()
+                    .equals(configurable.getConfigureName())) {
+                removeIndex = i;
+            }
+        }
+        if (AbstractConfigurable.class.isInstance(configurable)) {
+            ((AbstractConfigurable)configurable).setConfigurableService(this);
+        }
+        if (removeIndex != -1) {
+            list.remove(removeIndex);
+        }
+        list.add(configurable);
+    }
+
+    @Override
+    protected void updateConfigurable(IConfigurable configure) {
+        List<IConfigurable> list = namespace2Configure.get(configure.getNameSpace());
+        if (list == null || list.size() == 0) {
+            throw new RuntimeException(
+                    "not have exist configure " + configure.getNameSpace() + "," + configure.getType() + ","
+                            + configure.getConfigureName());
+        }
+        for (int i = 0; i < list.size(); i++) {
+            IConfigurable config = list.get(i);
+            if (config.getType().equals(configure.getType()) && config.getConfigureName()
+                    .equals(configure.getConfigureName())) {
+                list.set(i, configure);
+                return;
+            }
+        }
+        throw new RuntimeException(
+                "not have exist configure " + configure.getNameSpace() + "," + configure.getType() + ","
+                        + configure.getConfigureName());
+    }
+
+    @Override
+    public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+        refreshConfigurable(getNamespace());
+        return queryConfigurableByType(type);
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/MemorySupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/MemorySupportParentConfigureService.java
new file mode 100644
index 0000000..528d087
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configuable/service/impl/MemorySupportParentConfigureService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.service.impl;
+
+import java.util.Properties;
+
+import org.apache.rocketmq.streams.configuable.service.AbstractSupportParentConfigureService;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configuable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import com.google.auto.service.AutoService;
+
+
+@AutoService(IConfigurableService.class)
+@ServiceName(ConfigurableServcieType.MEMORY_SERVICE_NAME)
+public class MemorySupportParentConfigureService extends AbstractSupportParentConfigureService {
+
+    @Override
+    protected void initBeforeInitConfigurable(Properties property) {
+        //        this.rootConfigureService = new MemoryConfigureService(property);
+        this.parentConfigureService = new MemoryConfigureService(properties);
+        this.configureService = new MemoryConfigureService(properties);
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
new file mode 100644
index 0000000..91b3171
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/ConfigurableComponent.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.component.ConfigureDescriptor;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configurable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.configurable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.configurable.service.ConfigurableServiceFactory;
+import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * 对Configurable对象,做统一的管理,统一查询,插入和更新。 insert/update 把configuabel对象写入存储,支持文件存储(file),内存存储(memory)和db存储(DB)。可以在配置通过这个ConfigureFileKey.CONNECT_TYPE key 配置 query 是基于内存的查询,对象定时load到内存,可以在属性文件通过这个ConfigureFileKey.POLLING_TIME key配置加载周期,单位是秒 新对象加载后生效,已经存在的对象只有updateFlag发生变化才会被替换
+ */
+public class ConfigurableComponent extends AbstractComponent<IConfigurableService>
+    implements IConfigurableService {
+
+    private static final Log LOG = LogFactory.getLog(ConfigurableComponent.class);
+
+    protected volatile IConfigurableService configureService = null;
+
+    protected transient String namespace;
+
+    public ConfigurableComponent() {
+        initConfigurableServiceDescriptor();
+        addConfigureDescriptor(
+            new ConfigureDescriptor(CONNECT_TYPE, false, ConfigurableServcieType.DEFAULT_SERVICE_NAME));
+    }
+
+    public static ConfigurableComponent getInstance(String namespace) {
+        return ComponentCreator.getComponent(namespace, ConfigurableComponent.class);
+    }
+
+    @Override
+    protected boolean initProperties(Properties properties) {
+        try {
+            if (configureService != null) {
+                return true;
+            }
+            this.configureService = ConfigurableServiceFactory.createConfigurableService(properties);
+            return true;
+        } catch (Exception e) {
+            LOG.error("ConfigurableComponent create error,properties= " + properties, e);
+            return false;
+        }
+
+    }
+
+    @Override
+    public boolean startComponent(String namespace) {
+        try {
+            this.namespace = namespace;
+            configureService.initConfigurables(namespace);
+            return true;
+        } catch (Exception e) {
+            LOG.error("ConfigurableComponent init error, namespace is " + namespace, e);
+            return false;
+        }
+
+    }
+
+    /**
+     * 启动测试模式,用内存数据库存储和加载configurable数据
+     */
+    public static void begineTestMode() {
+        System.setProperty(ConfigurableComponent.CONNECT_TYPE, ConfigurableServcieType.MEMORY_SERVICE_NAME);
+    }
+
+    /**
+     * 关闭测试模式,用配置文件中配置的属性加载configuable数据
+     */
+    public static void endTestMode() {
+        System.clearProperty(ConfigurableComponent.CONNECT_TYPE);
+    }
+
+    @Override
+    public boolean stop() {
+        return true;
+    }
+
+    @Override
+    public IConfigurableService getService() {
+        return configureService;
+    }
+
+    @Override
+    public void initConfigurables(String namespace) {
+        configureService.initConfigurables(namespace);
+    }
+
+    @Override
+    public boolean refreshConfigurable(String namespace) {
+        return configureService.refreshConfigurable(namespace);
+    }
+
+    public void mockConfigurable(String namespace) {
+        refreshConfigurable(namespace);
+
+    }
+
+    @Override
+    public List<IConfigurable> queryConfigurable(String type) {
+        return configureService.queryConfigurable(type);
+    }
+
+    @Override
+    public <T extends IConfigurable> List<T> queryConfigurableByType(String type) {
+        return configureService.queryConfigurableByType(type);
+    }
+
+    @Override
+    public IConfigurable queryConfigurableByIdent(String type, String name) {
+        return configureService.queryConfigurableByIdent(type, name);
+    }
+
+    @Override
+    public IConfigurable queryConfigurableByIdent(String identification) {
+        return configureService.queryConfigurableByIdent(identification);
+    }
+
+    @Override
+    public void insert(IConfigurable configurable) {
+        configureService.insert(configurable);
+        ConfigurableUtil.refreshMock(configurable);
+    }
+
+    @Override
+    public void update(IConfigurable configurable) {
+        configureService.update(configurable);
+    }
+
+    @Override
+    public <T> Map<String, T> queryConfigurableMapByType(String type) {
+        return configureService.queryConfigurableMapByType(type);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T queryConfigurable(String configurableType, String name) {
+        return (T)queryConfigurableByIdent(configurableType, name);
+    }
+
+    //protected void insertConfigurable(JSONObject message, IConfigurable configurable) {
+    //    ConfigurableUtil.insertConfigurable(message, configurable, this.configureService);
+    //}
+
+    @Override
+    public String getNamespace() {
+        if (AbstractConfigurableService.class.isInstance(configureService)) {
+            return ((AbstractConfigurableService)configureService).getNamespace();
+        }
+        return namespace;
+    }
+
+    @Override
+    public Collection<IConfigurable> findAll() {
+        return configureService.findAll();
+    }
+
+    @Override
+    public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+        return configureService.loadConfigurableFromStorage(type);
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/model/Configure.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/model/Configure.java
new file mode 100644
index 0000000..bb4531f
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/model/Configure.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.model;
+
+import org.apache.rocketmq.streams.common.model.Entity;
+
+/**
+ * configuable如果存储在db,这个是db表的映射对象
+ */
+public class Configure extends Entity {
+
+    private static final long serialVersionUID = 5668017348345235669L;
+
+    private String nameSpace;
+    private String type;
+    private String name;
+    // private String identification;
+    private String jsonValue;
+    private String modifyTime;
+    private String remark;
+    private int openRange;
+
+    public static String createTableSQL(String tableName) {
+        return "/******************************************/\n"
+            + "/*   TableName = dipper_configure   */\n"
+            + "/******************************************/\n"
+            + "CREATE TABLE IF NOT EXISTS `" + tableName + "` (\n"
+            + "  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',\n"
+            + "  `gmt_create` datetime NOT NULL COMMENT '创建时间',\n"
+            + "  `gmt_modified` datetime NOT NULL COMMENT '修改时间',\n"
+            + "  `namespace` varchar(32) NOT NULL COMMENT '项目标识',\n"
+            + "  `type` varchar(32) NOT NULL COMMENT '配置类型',\n"
+            + "  `name` varchar(128) NOT NULL COMMENT '配置名称',\n"
+            + "  `json_value` text NOT NULL COMMENT '配置内容',\n"
+            + "  `status` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '1:正在使用 0:已失效',\n"
+            + "  PRIMARY KEY (`id`),\n"
+            + "  UNIQUE KEY `uk_namespace_type_name` (`namespace`,`type`,`name`),\n"
+            + "  KEY `idx_namespace` (`namespace`)\n"
+            + ") ENGINE=InnoDB AUTO_INCREMENT=1814834 DEFAULT CHARSET=utf8 COMMENT='统一接入配置项'\n"
+            + ";";
+    }
+
+    public String getNameSpace() {
+        return nameSpace;
+    }
+
+    public void setNameSpace(String nameSpace) {
+        this.nameSpace = nameSpace;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    // public String getIdentification() {
+    // return identification;
+    // }
+
+    // public void createIdentification() {
+    // this.identification = MapKeyUtil.createKey(nameSpace, type, name);
+    // }
+
+    public String getJsonValue() {
+        return jsonValue;
+    }
+
+    public void setJsonValue(String jsonValue) {
+        this.jsonValue = jsonValue;
+    }
+
+    public String getModifyTime() {
+        return modifyTime;
+    }
+
+    public void setModifyTime(String modifyTime) {
+        this.modifyTime = modifyTime;
+    }
+
+    public String getRemark() {
+        return remark;
+    }
+
+    public void setRemark(String remark) {
+        this.remark = remark;
+    }
+
+    public int getOpenRange() {
+        return openRange;
+    }
+
+    public void setOpenRange(int openRange) {
+        this.openRange = openRange;
+    }
+
+    @Override
+    public String toString() {
+        return "Configure{" + "nameSpace='" + nameSpace + '\'' + ", type='" + type + '\'' + ", name='" + name + '\''
+            + ", jsonValue='" + jsonValue + '\'' + ", modifyTime='" + modifyTime + '\'' + ", remark='" + remark + '\''
+            + ", openRange=" + openRange + '}';
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
new file mode 100644
index 0000000..85ef626
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.datatype.DataType;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.common.utils.*;
+import org.apache.rocketmq.streams.configurable.model.Configure;
+
+import java.util.*;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractConfigurableService implements IConfigurableService {
+
+    private static final Log LOG = LogFactory.getLog(AbstractConfigurableService.class);
+
+    private static final String CLASS_NAME = IConfigurableService.CLASS_NAME;
+
+    protected Map<String, List<IConfigurable>> type2ConfigurableMap = new HashMap<>();
+
+    protected Map<String, IConfigurable> name2ConfigurableMap = new HashMap<>();
+
+    protected Map<String, IConfigurable> configurableMap = new HashMap<>();
+
+    protected Properties properties;
+
+    protected transient String namespace;
+
+    public AbstractConfigurableService(Properties properties) {
+        this.properties = properties;
+    }
+
+    public AbstractConfigurableService() {
+    }
+
+    @Override
+    public IConfigurable queryConfigurableByIdent(String identification) {
+        return name2ConfigurableMap.get(identification);
+    }
+
+    protected String getConfigureKey(String nameSpace, String type, String name) {
+        return MapKeyUtil.createKey(nameSpace, type, name);
+    }
+
+    protected void updateConfiguresCache(IConfigurable configurable) {
+        if (configurable == null) {
+            return;
+        }
+        configurable.toJson();
+        String key = getConfigureKey(configurable.getNameSpace(), configurable.getType(), configurable.getConfigureName());
+        configurableMap.put(key, configurable);
+    }
+
+    protected void updateConfiguresCache(List<IConfigurable> configureList) {
+        for (IConfigurable iConfigurable : configureList) {
+            updateConfiguresCache(iConfigurable);
+        }
+    }
+
+    protected boolean equals(String key, List<?> newConfigureList) {
+        for (Object o : newConfigureList) {
+            IConfigurable configure = (IConfigurable)o;
+            String tempKey = getConfigureKey(configure.getNameSpace(), configure.getType(), configure.getConfigureName());
+            if (key.equals(tempKey)) {
+                IConfigurable oldConfigure = configurableMap.get(key);
+                if (oldConfigure == null) {
+                    continue;
+                }
+                return ConfigurableUtil.compare(oldConfigure, configure);
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public <T extends IConfigurable> List<T> queryConfigurableByType(String type) {
+        List<IConfigurable> list = queryConfigurable(type);
+        if (list == null) {
+            return new ArrayList<T>();
+        }
+        List<T> result = new ArrayList<T>();
+        for (IConfigurable configurable : list) {
+            result.add((T)configurable);
+        }
+        return result;
+    }
+
+    @Override
+    public boolean refreshConfigurable(String namespace) {
+        //每次刷新,重新刷新配置文件
+        //if(ComponentCreator.propertiesPath!=null){
+        //    ComponentCreator.setProperties(ComponentCreator.propertiesPath);
+        //}
+        this.namespace = namespace;
+        // Map<String, List<IConfigurable>> namespace2ConfigurableMap = new HashMap<>();
+        Map<String, List<IConfigurable>> tempType2ConfigurableMap = new HashMap<>();
+        Map<String, IConfigurable> tempName2ConfigurableMap = new HashMap<>();
+        GetConfigureResult configures = loadConfigurable(namespace);
+        // updateConfiguresCache(configures.getConfigure());
+        if (configures != null && configures.isQuerySuccess() && configures.getConfigurables() != null) {
+            // List<Configure> configureList = filterConfigure(configures.getConfigure());
+            List<IConfigurable> configurables = configures.getConfigurables();
+            List<IConfigurable> configurableList = checkAndUpdateConfigurables(namespace, configurables,
+                tempType2ConfigurableMap, tempName2ConfigurableMap,
+                configures.getConfigurables());
+            // this.namespace2ConfigurableMap = namespace2ConfigurableMap;
+            for (IConfigurable configurable : configurableList) {
+                if (configurable instanceof IAfterConfiguableRefreshListerner) {
+                    ((IAfterConfiguableRefreshListerner)configurable).doProcessAfterRefreshConfigurable(this);
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public <T> T queryConfigurable(String configurableType, String name) {
+        return (T)queryConfigurableByIdent(configurableType, name);
+    }
+
+    protected List<IConfigurable> checkAndUpdateConfigurables(String namespace, List<IConfigurable> configurables,
+                                                              Map<String, List<IConfigurable>> tempType2ConfigurableMap,
+                                                              Map<String, IConfigurable> tempName2ConfigurableMap,
+                                                              List configures) {
+        List<IConfigurable> configurableList = new ArrayList<>();
+        for (IConfigurable configurable : configurables) {
+            try {
+                boolean isUpdate = update(configurable, tempName2ConfigurableMap, tempType2ConfigurableMap);
+                if (isUpdate) {
+                    configurableList.add(configurable);
+                }
+            } catch (Exception e) {
+                LOG.error("组件初始化异常:" + e.getMessage() + ",name=" + configurable.getConfigureName(), e);
+            }
+        }
+        destroyOldConfigurables(tempName2ConfigurableMap);
+        this.name2ConfigurableMap = tempName2ConfigurableMap;
+        this.type2ConfigurableMap = tempType2ConfigurableMap;
+        return configurableList;
+    }
+
+    private void destroyOldConfigurables(Map<String, IConfigurable> tempName2ConfigurableMap) {
+        Iterator<Map.Entry<String, IConfigurable>> it = this.name2ConfigurableMap.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, IConfigurable> entry = it.next();
+            String key = entry.getKey();
+            IConfigurable value = entry.getValue();
+            if (!tempName2ConfigurableMap.containsKey(key)) {
+                destroyOldConfigurable(value);
+            }
+        }
+
+    }
+
+    private void destroyOldConfigurable(IConfigurable oldConfigurable) {
+        if (AbstractConfigurable.class.isInstance(oldConfigurable)) {
+            ((AbstractConfigurable)oldConfigurable).destroy();
+        }
+        String key = getConfigureKey(oldConfigurable.getNameSpace(), oldConfigurable.getType(),
+            oldConfigurable.getConfigureName());
+        configurableMap.remove(key);
+    }
+
+    protected void initConfigurable(IConfigurable configurable) {
+        if (AbstractConfigurable.class.isInstance(configurable)) {
+            AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
+            abstractConfigurable.setConfigurableService(this);
+        }
+
+        configurable.init();
+
+    }
+
+    /**
+     * 内部使用
+     */
+    private ScheduledExecutorService scheduledExecutorService;
+
+    @Override
+    public void initConfigurables(final String namespace) {
+        refreshConfigurable(namespace);
+        long polingTime = -1;
+        if (this.properties != null) {
+            String pollingTimeStr = this.properties.getProperty(AbstractComponent.POLLING_TIME);
+            if (StringUtil.isNotEmpty(pollingTimeStr)) {
+                polingTime = Long.valueOf(pollingTimeStr);
+            }
+        }
+        if (polingTime > 0) {
+            scheduledExecutorService = new ScheduledThreadPoolExecutor(3);
+            scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        refreshConfigurable(namespace);
+                    } catch (Exception e) {
+                        LOG.error("Load configurables error:" + e.getMessage(), e);
+                    }
+                }
+            }, polingTime, polingTime, TimeUnit.SECONDS);
+        }
+    }
+    // @Override
+    // public List<IConfigurable> queryConfigurable(String nameSpace) {
+    // return namespace2ConfigurableMap.get(nameSpace);
+    // }
+
+    @Override
+    public List<IConfigurable> queryConfigurable(String type) {
+        String key = MapKeyUtil.createKey(type);
+        return type2ConfigurableMap.get(key);
+    }
+
+    @Override
+    public IConfigurable queryConfigurableByIdent(String type, String name) {
+        String key = MapKeyUtil.createKey(type, name);
+        return name2ConfigurableMap.get(key);
+    }
+
+    /**
+     * 根据namespace加载配置信息
+     *
+     * @param namespace
+     * @return
+     * @throws Exception
+     */
+    protected abstract GetConfigureResult loadConfigurable(String namespace);
+
+    @Override
+    public void update(IConfigurable configurable) {
+        // update(configurable,name2ConfigurableMap,type2ConfigurableMap);
+        updateConfigurable(configurable);
+    }
+
+    protected abstract void updateConfigurable(IConfigurable configurable);
+
+    protected abstract void insertConfigurable(IConfigurable configurable);
+
+    protected boolean update(IConfigurable configurable, Map<String, IConfigurable> name2ConfigurableMap,
+                             Map<String, List<IConfigurable>> type2ConfigurableMap) {
+        if (configurable == null) {
+            return false;
+        }
+
+        boolean isUpdate = false;
+        List<IConfigurable> configurableList = new ArrayList<>();
+        configurableList.add(configurable);
+
+        String nameKey = MapKeyUtil.createKey(configurable.getType(), configurable.getConfigureName());
+        if (this.name2ConfigurableMap.containsKey(nameKey)) {
+            String configureKey = getConfigureKey(namespace, configurable.getType(), configurable.getConfigureName());
+            IConfigurable oldConfigurable = this.name2ConfigurableMap.get(nameKey);
+            if (equals(configureKey, configurableList)) {
+                configurable = oldConfigurable;
+                // name2ConfigurableMap.put(nameKey, name2ConfigurableMap.get(nameKey));
+            } else {
+                destroyOldConfigurable(oldConfigurable);
+                initConfigurable(configurable);
+                isUpdate = true;
+            }
+        } else {
+            initConfigurable(configurable);
+            isUpdate = true;
+        }
+        updateConfiguresCache(configurable);
+        name2ConfigurableMap.put(nameKey, configurable);
+        String typeKey = MapKeyUtil.createKey(configurable.getType());
+        // put2Map(namespace2ConfigurableMap, namespace, configurable);
+        put2Map(type2ConfigurableMap, typeKey, configurable);
+        return isUpdate;
+    }
+
+    @Override
+    public void insert(IConfigurable configurable) {
+        // update(configurable,name2ConfigurableMap,type2ConfigurableMap);
+        insertConfigurable(configurable);
+    }
+
+    /**
+     * 给一个扣,可以跨命名空间查询数据
+     *
+     * @param namespaces
+     * @return
+     */
+    public List<IConfigurable> queryConfiguableByNamespace(String... namespaces) {
+        List<IConfigurable> configurables = new ArrayList<>();
+        if (namespaces == null || namespaces.length == 0) {
+            return configurables;
+        }
+        for (String namespace : namespaces) {
+            GetConfigureResult result = loadConfigurable(namespace);
+            if (result.querySuccess) {
+                if (result.configurables != null && result.configurables.size() > 0) {
+                    configurables.addAll(result.configurables);
+                }
+            } else {
+                throw new RuntimeException("Load configurable error, the namespace is " + namespace);
+            }
+        }
+        return configurables;
+
+    }
+
+    /**
+     * 往一个value是list的map中添加数据,如果list是空创建,否则直接插入
+     *
+     * @param map
+     * @param key
+     * @param configurable
+     */
+    protected void put2Map(Map<String, List<IConfigurable>> map, String key, IConfigurable configurable) {
+        List<IConfigurable> list = map.get(key);
+        if (list == null) {
+            list = new ArrayList<IConfigurable>();
+            map.put(key, list);
+        }
+        list.add(configurable);
+    }
+
+    @Override
+    public Collection<IConfigurable> findAll() {
+        return name2ConfigurableMap.values();
+    }
+
+    /**
+     * 把configurable转换成configure
+     *
+     * @param configurable
+     * @return
+     */
+    protected Configure createConfigure(IConfigurable configurable) {
+        Configure configure = new Configure();
+        configure.setType(configurable.getType());
+        configure.setName(configurable.getConfigureName());
+        configure.setNameSpace(configurable.getNameSpace());
+        String jsonString = configurable.toJson();
+        if (!StringUtil.isEmpty(jsonString)) {
+            JSONObject jsonObject = JSONObject.parseObject(jsonString);
+            jsonObject.put(CLASS_NAME, configurable.getClass().getName());
+            configure.setJsonValue(jsonObject.toJSONString());
+        }
+        // configure.createIdentification();
+        return configure;
+    }
+
+    @Override
+    public <T> Map<String, T> queryConfigurableMapByType(String type) {
+        List<IConfigurable> configurables = queryConfigurable(type);
+        if (configurables == null) {
+            return new HashMap<String, T>();
+        }
+        Map<String, T> result = new HashMap<String, T>();
+        for (IConfigurable configurable : configurables) {
+            result.put(configurable.getConfigureName(), (T)configurable);
+        }
+        return result;
+    }
+
+    /**
+     * 把configure转换成configurable
+     *
+     * @param configures
+     * @return
+     */
+    protected List<IConfigurable> convert(List<Configure> configures) {
+        if (configures == null) {
+            return new ArrayList<IConfigurable>();
+        }
+        List<IConfigurable> configurables = new ArrayList<IConfigurable>();
+        for (Configure configure : configures) {
+            IConfigurable configurable = convert(configure);
+            if (configurable != null) {
+                configurables.add(configurable);
+            }
+
+        }
+        return configurables;
+    }
+
+    protected IConfigurable createConfigurableFromJson(String namespace, String type, String name, String jsonValue) {
+        if (StringUtil.isEmpty(jsonValue)) {
+            return null;
+        }
+        JSONObject jsonObject = JSONObject.parseObject(jsonValue);
+        String className = jsonObject.getString(CLASS_NAME);
+        IConfigurable configurable = createConfigurable(className);
+        if (configurable == null) {
+            return null;
+        }
+        configurable.setConfigureName(name);
+        configurable.setNameSpace(namespace);
+        configurable.setType(type);
+        if (AbstractConfigurable.class.isInstance(configurable)) {
+            AbstractConfigurable abstractConfigurable = (AbstractConfigurable)configurable;
+            abstractConfigurable.setConfigurableService(this);
+        }
+        configurable.toObject(jsonValue);
+        return configurable;
+    }
+
+    /**
+     * 提供一个入口,可以让外部用户改变configure对应的configurable的值
+     *
+     * @param configure
+     * @return
+     */
+    protected IConfigurable convert(Configure configure) {
+
+        return convertConfigurable(configure);
+    }
+
+    protected IConfigurable convertConfigurable(Configure configure) {
+        String className = null;
+        try {
+            String jsonString = configure.getJsonValue();
+            IConfigurable configurable =
+                createConfigurableFromJson(configure.getNameSpace(), configure.getType(), configure.getName(),
+                    jsonString);
+            if (configurable instanceof Entity) {
+                // add by wangtl 20171110 Configurable接口第三方包也在用,故不能Configurable里加接口,只能加到抽象类里,这里强转下
+                Entity abs = (Entity)configurable;
+                abs.setId(configure.getId());
+                abs.setGmtCreate(configure.getGmtCreate());
+                abs.setGmtModified(configure.getGmtModified());
+                /*
+                 * abs.setTempKey((configurable.getNameSpace() + configurable.getType() +
+                 * configurable.getConfigureName() + jsonString).hashCode());
+                 */
+            }
+            convertPost(configurable);
+            return configurable;
+        } catch (Exception e) {
+            LOG.error("转换异常:" + configure.toString(), e);
+            return null;
+        }
+    }
+
+    /**
+     * 如果需要改变configurable的属性,可以再这里设置
+     *
+     * @param configurable
+     */
+    @SuppressWarnings("rawtypes")
+    protected void convertPost(IConfigurable configurable) {
+        if (this.properties == null) {
+            return;
+        }
+        String identification =
+            MapKeyUtil.createKey(configurable.getNameSpace(), configurable.getType(), configurable.getConfigureName());
+        String propertyValue = this.properties.getProperty(identification);
+        if (StringUtil.isEmpty(propertyValue)) {
+            return;
+        }
+        String[] fieldName2Values = propertyValue.split(",");
+        if (fieldName2Values == null || fieldName2Values.length == 0) {
+            return;
+        }
+        for (String fieldName2Value : fieldName2Values) {
+            try {
+                String[] fieldName2ValueArray = fieldName2Value.split(":");
+                if (fieldName2ValueArray == null || fieldName2ValueArray.length != 2) {
+                    continue;
+                }
+                String fieldName = fieldName2ValueArray[0];
+                String value = fieldName2ValueArray[1];
+                Class clazz = ReflectUtil.getBeanFieldType(configurable.getClass(), fieldName);
+                DataType dataType = DataTypeUtil.createDataType(clazz, null);
+                if (dataType == null) {
+                    continue;
+                }
+                Object fieldValue = dataType.getData(value);
+                ReflectUtil.setBeanFieldValue(configurable, fieldName, fieldValue);
+
+            } catch (Exception e) {
+                LOG.error("convert post error " + fieldName2Value, e);
+                continue;
+            }
+
+        }
+    }
+
+    /**
+     * 创建configurable对象
+     *
+     * @param className class name
+     * @return
+     */
+    @SuppressWarnings("rawtypes")
+    protected IConfigurable createConfigurable(String className) {
+        return ReflectUtil.forInstance(className);
+    }
+
+    public class GetConfigureResult {
+
+        private boolean querySuccess;
+        private List<IConfigurable> configurables;
+
+        public boolean isQuerySuccess() {
+            return querySuccess;
+        }
+
+        public void setQuerySuccess(boolean querySuccess) {
+            this.querySuccess = querySuccess;
+        }
+
+        public List<IConfigurable> getConfigurables() {
+            return configurables;
+        }
+
+        public void setConfigurables(List<IConfigurable> configurables) {
+            this.configurables = configurables;
+        }
+    }
+
+    @Override
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java
new file mode 100644
index 0000000..d2d26cd
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractSupportParentConfigureService.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * namespace 分层,支持顶级命名空间,顶级命名空间的对象,所有命名空间都可见。顶级命名空间是固定值IConfigurableService.PARENT_CHANNEL_NAME_SPACE
+ */
+public abstract class AbstractSupportParentConfigureService extends AbstractConfigurableService
+    implements IConfigurableService {
+
+    private static final Log LOG = LogFactory.getLog(AbstractSupportParentConfigureService.class);
+    protected IConfigurableService configureService = null;
+    protected IConfigurableService parentConfigureService = null;
+    //protected IConfigurableService shareConfigureService = null;
+    protected Properties properties;
+
+    public AbstractSupportParentConfigureService() {
+        super(null);
+    }
+
+    public void initMethod(Properties property) {
+        this.properties = property;
+        initBeforeInitConfigurable(property);
+    }
+
+    protected abstract void initBeforeInitConfigurable(Properties property);
+
+    @Override
+    public void initConfigurables(String namespace) {
+
+        if (!IConfigurableService.PARENT_CHANNEL_NAME_SPACE.equals(namespace)) {
+            parentConfigureService.initConfigurables(IConfigurableService.PARENT_CHANNEL_NAME_SPACE);
+        } else {
+            parentConfigureService = null;
+        }
+        configureService.initConfigurables(namespace);
+    }
+
+    @Override
+    public boolean refreshConfigurable(String namespace) {
+
+        if (!IConfigurableService.PARENT_CHANNEL_NAME_SPACE.equals(namespace)) {
+            parentConfigureService.refreshConfigurable(IConfigurableService.PARENT_CHANNEL_NAME_SPACE);
+            // initShareConfigurableService(namespace);
+        }
+        configureService.refreshConfigurable(namespace);
+        return true;
+    }
+
+    @Override
+    public List<IConfigurable> queryConfigurable(String type) {
+        List<IConfigurable> result = configureService.queryConfigurable(type);
+        if (result == null) {
+            result = new ArrayList<>();
+        }
+        //if (shareConfigureService != null) {
+        //    List<IConfigurable> share = shareConfigureService.queryConfigurable(type);
+        //    if (share != null) {
+        //        result.addAll(share);
+        //    }
+        //}
+        if (parentConfigureService == null) {
+            return result;
+        }
+        List<IConfigurable> parent = parentConfigureService.queryConfigurable(type);
+        if (parent != null) {
+            result.addAll(parent);
+        }
+        return result;
+    }
+
+    @Override
+    public IConfigurable queryConfigurableByIdent(String type, String name) {
+        IConfigurable configurable = configureService.queryConfigurableByIdent(type, name);
+        if (configurable != null) {
+            return configurable;
+        }
+        if (parentConfigureService == null) {
+            return null;
+        }
+        //if (shareConfigureService != null) {
+        //    configurable = shareConfigureService.queryConfigurableByIdent(type, name);
+        //}
+        if (configurable != null) {
+            return configurable;
+        }
+        return parentConfigureService.queryConfigurableByIdent(type, name);
+    }
+
+    @Override
+    public IConfigurable queryConfigurableByIdent(String identification) {
+        IConfigurable configurable = configureService.queryConfigurableByIdent(identification);
+        if (configurable != null) {
+            return configurable;
+        }
+        if (parentConfigureService == null) {
+            return null;
+        }
+        //if (shareConfigureService != null) {
+        //    configurable = shareConfigureService.queryConfigurableByIdent(identification);
+        //}
+        if (configurable != null) {
+            return configurable;
+        }
+        return parentConfigureService.queryConfigurableByIdent(identification);
+    }
+
+    @Override
+    protected void insertConfigurable(IConfigurable configurable) {
+        if (parentConfigureService != null && configurable.getNameSpace()
+            .equals(IConfigurableService.PARENT_CHANNEL_NAME_SPACE)) {
+            parentConfigureService.insert(configurable);
+        } else {
+            configureService.insert(configurable);
+        }
+    }
+
+    @Override
+    protected void updateConfigurable(IConfigurable configurable) {
+        if (parentConfigureService != null && configurable.getNameSpace()
+            .equals(IConfigurableService.PARENT_CHANNEL_NAME_SPACE)) {
+            parentConfigureService.update(configurable);
+        } else {
+            configureService.update(configurable);
+        }
+    }
+
+    @Override
+    public <T> T queryConfigurable(String configurableType, String name) {
+        return (T)queryConfigurableByIdent(configurableType, name);
+    }
+
+    @Override
+    protected GetConfigureResult loadConfigurable(String namespace) {
+        return null;
+    }
+
+    //protected void initShareConfigurableService(String namespace) {
+    //    if (parentConfigureService == null) {
+    //        return;
+    //    }
+    //    shareConfigureService = new AbstractReadOnlyConfigurableService() {
+    //
+    //        @Override
+    //        public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+    //            refreshConfigurable(namespace);
+    //            return queryConfigurableByType(type);
+    //        }
+    //
+    //        @Override
+    //        protected List<IConfigurable> loadConfigurables(String namespace) {
+    //            List<IConfigurable> parent = parentConfigureService.queryConfigurable(ShareConfiguable.TYPE);
+    //            List<IConfigurable> shareConfigurables = new ArrayList<>();
+    //            if (parent == null) {
+    //                return shareConfigurables;
+    //            }
+    //            for (IConfigurable configurable : parent) {
+    //                ShareConfiguable shareConfiguable = (ShareConfiguable) configurable;
+    //                if (shareConfiguable.getShareAll() || shareConfiguable.getShareNameSpaces().contains(namespace)) {
+    //                    String sharedNameSpace = shareConfiguable.getSharedNameSpace();
+    //                    String sharedType = shareConfiguable.getSharedType();
+    //                    String sharedName = shareConfiguable.getSharedName();
+    //                    List<IConfigurable> sharedConfigrables =
+    //                        createAndQueryConfigurable(sharedNameSpace, sharedType, sharedName);
+    //                    if (sharedConfigrables != null) {
+    //                        shareConfigurables.addAll(sharedConfigrables);
+    //                    }
+    //                }
+    //            }
+    //            return shareConfigurables;
+    //        }
+    //
+    //
+    //    };
+    //    shareConfigureService.refreshConfigurable(namespace);
+    //
+    //}
+
+    protected List<IConfigurable> createAndQueryConfigurable(String sharedNameSpace, String sharedType,
+                                                             String sharedName) {
+        IConfigurableService innerSharedConfigurableService =
+            ConfigurableServiceFactory.createConfigurableService(properties);
+        innerSharedConfigurableService.refreshConfigurable(sharedNameSpace);
+        if (StringUtil.isNotEmpty(sharedName)) {
+            List<IConfigurable> configurables = new ArrayList<>();
+            IConfigurable configurable = innerSharedConfigurableService.queryConfigurableByIdent(sharedType, sharedName);
+            configurables.add(configurable);
+            return configurables;
+        } else {
+            return innerSharedConfigurableService.queryConfigurable(sharedType);
+        }
+
+    }
+
+    @Override
+    public Collection<IConfigurable> findAll() {
+        List<IConfigurable> configurables = new ArrayList<>();
+        if (parentConfigureService != null) {
+            Collection<IConfigurable> tmp = parentConfigureService.findAll();
+            if (tmp != null || tmp.size() > 0) {
+                configurables.addAll(tmp);
+            }
+        }
+        Collection<IConfigurable> tmp = configureService.findAll();
+        if (tmp != null || tmp.size() > 0) {
+            configurables.addAll(tmp);
+        }
+        return configurables;
+    }
+
+    public IConfigurableService getConfigureService() {
+        return configureService;
+    }
+
+    @Override
+    public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+        List<T> configurables = new ArrayList<>();
+        if (parentConfigureService != null) {
+            Collection<T> tmp = parentConfigureService.loadConfigurableFromStorage(type);
+            if (tmp != null || tmp.size() > 0) {
+                configurables.addAll(tmp);
+            }
+        }
+        Collection<T> tmp = configureService.loadConfigurableFromStorage(type);
+        if (tmp != null || tmp.size() > 0) {
+            configurables.addAll(tmp);
+        }
+        return configurables;
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServcieType.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServcieType.java
new file mode 100644
index 0000000..a4998ff
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServcieType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+
+@Deprecated
+public class ConfigurableServcieType {
+
+    public static final String DEFAULT_SERVICE_NAME = IConfigurableService.DEFAULT_SERVICE_NAME;
+    public static final String MEMORY_SERVICE_NAME = IConfigurableService.MEMORY_SERVICE_NAME;
+    public static final String FILE_SERVICE_NAME = IConfigurableService.FILE_SERVICE_NAME;
+    public static final String HTTP_SERVICE_NAME = IConfigurableService.HTTP_SERVICE_NAME;
+
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
new file mode 100644
index 0000000..449f04a
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/ConfigurableServiceFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;
+
+import java.util.Properties;
+
+public class ConfigurableServiceFactory {
+    private static ServiceLoaderComponent<IConfigurableService> configurableServiceLoaderComponent =
+        ServiceLoaderComponent.getInstance(IConfigurableService.class);
+    public static final String CONFIGURABLE_SERVICE_TYPE = "dipper.configurable.service.type";
+    private static final Log LOG = LogFactory.getLog(ConfigurableServiceFactory.class);
+
+    public static IConfigurableService createConfigurableService(Properties properties) {
+        try {
+            Properties properties1 = new Properties();
+            properties1.putAll(properties);
+            String type = properties1.getProperty(CONFIGURABLE_SERVICE_TYPE);
+            if (StringUtil.isEmpty(type)) {
+                type = IConfigurableService.DEFAULT_SERVICE_NAME;
+                ;
+            }
+            IConfigurableService configurableService = getConfigurableServcieType(type);
+            if (AbstractSupportParentConfigureService.class.isInstance(configurableService)) {
+                ((AbstractSupportParentConfigureService)configurableService).initMethod(properties1);
+            }
+            return configurableService;
+        } catch (Exception e) {
+            LOG.error("create ConfigurableService error", e);
+            return null;
+        }
+
+    }
+
+    public static IConfigurableService getConfigurableServcieType(String type) {
+        IConfigurableService configurableService = (IConfigurableService)configurableServiceLoaderComponent.getService().loadService(type);
+        return ReflectUtil.forInstance(configurableService.getClass().getName());
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
new file mode 100644
index 0000000..1a5fe9b
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service.impl;
+
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configurable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.configurable.model.Configure;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.common.utils.DateUtil;
+import org.apache.rocketmq.streams.common.utils.FileUtil;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.StringUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class FileConfigureService extends AbstractConfigurableService {
+
+    public static final String FILE_PATH_NAME = IConfigurableService.FILE_PATH_NAME;
+    // 配置文件的路径
+    private static final Log LOG = LogFactory.getLog(FileConfigureService.class);
+    private static final String DEFAULT_FILE_NAME = "dipper_configure.cs";                        // 默认文件名
+    private static final String SIGN = "&&&&";                                       // 字段分割附号
+    public String fileName;
+
+    public FileConfigureService(Properties properties) {
+        super(properties);
+        initService(properties.getProperty(FILE_PATH_NAME));
+    }
+
+    protected void initService(String fileAndPath) {
+        if (StringUtil.isEmpty(fileAndPath)) {
+            String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+            if (path.endsWith(".jar")) {
+                int index = path.lastIndexOf(File.separator);
+                path = path.substring(0, index);
+            }
+            fileName = FileUtil.concatFilePath(path, DEFAULT_FILE_NAME);
+        } else {
+            fileName = fileAndPath;
+        }
+        LOG.info("load file from path = " + fileName);
+    }
+
+    @Override
+    protected GetConfigureResult loadConfigurable(String namespace) {
+        GetConfigureResult result = new GetConfigureResult();
+        try {
+            List<Configure> configures = selectOpening(namespace);
+            List<IConfigurable> configurables = convert(configures);
+            LOG.info("load configure namespace=" + namespace + " count=" + configures.size());
+            result.setConfigurables(configurables);
+            result.setQuerySuccess(true);// 该字段标示查询是否成功,若不成功则不会更新配置
+        } catch (Exception e) {
+            result.setQuerySuccess(false);
+            e.printStackTrace();
+            LOG.error("load configurable error ", e);
+        }
+        return result;
+    }
+
+    protected List<Configure> selectOpening(String namespace) {
+        List<String> list = loadFileLine(fileName);
+        List<Configure> configures = convert2Configure(list);
+        return filter(configures, namespace);
+    }
+
+    protected List<Configure> filter(List<Configure> configures, String namespace) {
+        if (configures == null) {
+            return new ArrayList<>();
+        }
+        if (StringUtil.isEmpty(namespace)) {
+            throw new RuntimeException("namespace can not empty ");
+        }
+        List<Configure> filterConfigures = new ArrayList<>();
+        for (Configure configure : configures) {
+            if (!namespace.equals(configure.getNameSpace())) {
+                continue;
+            }
+            filterConfigures.add(configure);
+        }
+        return filterConfigures;
+    }
+
+    @Override
+    protected void insertConfigurable(IConfigurable configure) {
+        if (configure == null) {
+            LOG.warn("insert configure is null");
+            return;
+        }
+        String row = configure2String(configure);
+
+        List<String> rows = loadFileLine(fileName);
+        if (rows == null) {
+            rows = new ArrayList<>();
+        }
+        List<Configure> configures = convert2Configure(rows);
+        String newKey =
+            MapKeyUtil.createKey(configure.getNameSpace(), configure.getType(), configure.getConfigureName());
+        boolean isReplace = false;
+        for (int i = 0; i < configures.size(); i++) {
+            Configure c = configures.get(i);
+            String old = MapKeyUtil.createKey(c.getNameSpace(), c.getType(), c.getName());
+            if (old.equals(newKey)) {
+                rows.set(i, configure2String(configure));
+                isReplace = true;
+                break;
+            }
+        }
+        if (!isReplace) {
+            rows.add(configure2String(configure));
+        }
+        writeFile(fileName, rows);
+    }
+
+    @Override
+    protected void updateConfigurable(IConfigurable configure) {
+        if (configure == null) {
+            LOG.warn("insert configure is null");
+            return;
+        }
+
+        List<String> rows = FileUtil.loadFileLine(fileName);
+        if (rows == null) {
+            rows = new ArrayList<>();
+        }
+        for (int i = 0; i < rows.size(); i++) {
+            String row = rows.get(i);
+            Configure oldConfigure = convert(row);
+            if (configure.getNameSpace().equals(oldConfigure.getNameSpace()) && configure.getType()
+                .equals(oldConfigure.getType()) && configure.getConfigureName().equals(oldConfigure.getName())) {
+                rows.set(i, configure2String(configure));
+            }
+        }
+        writeFile(fileName, rows);
+
+    }
+
+    protected Configure convert(String row) {
+        String[] values = row.split(SIGN);
+        String namespace = getColumnValue(values, 0, "namespace");
+        String type = getColumnValue(values, 1, "type");
+        String name = getColumnValue(values, 2, "name");
+        String jsonValue = getColumnValue(values, 3, "json_value");
+        String createDate = getColumnValue(values, 4, "gmt_create");
+        String modifiedDate = getColumnValue(values, 5, "gmt_modified");
+        String id = getColumnValue(values, 6, "id");
+        Configure configure = new Configure();
+        configure.setNameSpace(namespace);
+        configure.setType(type);
+        configure.setName(name);
+        configure.setJsonValue(jsonValue);
+        configure.setGmtCreate(DateUtil.parse(createDate));
+        configure.setGmtCreate(DateUtil.parse(modifiedDate));
+        configure.setId((id == null ? null : Long.valueOf(id)));
+
+        return configure;
+    }
+
+    protected List<Configure> convert2Configure(List<String> rows) {
+        List<Configure> configures = new ArrayList<Configure>();
+        for (String row : rows) {
+            configures.add(convert(row));
+        }
+        return configures;
+    }
+
+    protected String getColumnValue(String[] values, int i, String namespace) {
+        if (values == null || values.length == 0) {
+            return null;
+        }
+        if (values.length <= i) {
+            return null;
+        }
+        if ("null".equals(values[i])) {
+            return null;
+        }
+        return values[i];
+    }
+
+    /**
+     * 解密文件,并加载到内存
+     *
+     * @param fileName
+     * @return
+     */
+    protected List<String> loadFileLine(String fileName) {
+        List<String> rows = FileUtil.loadFileLine(fileName);
+        if (rows == null) {
+            rows = new ArrayList<>();
+        }
+        return doDecRowList(rows);
+    }
+
+    protected void writeFile(String fileName, List<String> rows) {
+        List<String> rowList = doEncryptRowList(rows);
+        FileUtil.write(fileName, rowList);
+    }
+
+    private List<String> doEncryptRowList(List<String> rows) {
+        return rows;
+    }
+
+    private List<String> doDecRowList(List<String> rows) {
+        return rows;
+    }
+
+    protected String configure2String(IConfigurable configure) {
+        Entity entity = null;
+        if (configure instanceof Entity) {
+            entity = (Entity)configure;
+        } else {
+            entity = new Entity();
+        }
+        String row = MapKeyUtil.createKeyBySign(SIGN, configure.getNameSpace(), configure.getType(),
+            configure.getConfigureName(), configure.toJson(), DateUtil.format(entity.getGmtCreate()),
+            DateUtil.format(entity.getGmtModified()), entity.getId() + "");
+        return row;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    @Override
+    public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+        refreshConfigurable(getNamespace());
+        return queryConfigurableByType(type);
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileSupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileSupportParentConfigureService.java
new file mode 100644
index 0000000..aafa805
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileSupportParentConfigureService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service.impl;
+
+import org.apache.rocketmq.streams.configurable.service.AbstractSupportParentConfigureService;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configurable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import com.google.auto.service.AutoService;
+
+import java.util.Properties;
+
+@AutoService(IConfigurableService.class)
+@ServiceName(ConfigurableServcieType.FILE_SERVICE_NAME)
+public class FileSupportParentConfigureService extends AbstractSupportParentConfigureService {
+
+    @Override
+    protected void initBeforeInitConfigurable(Properties property) {
+        this.configureService = new FileConfigureService(properties);
+        this.parentConfigureService = new FileConfigureService(properties);
+    }
+
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemoryConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemoryConfigureService.java
new file mode 100644
index 0000000..b6d1fd9
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemoryConfigureService.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
+import org.apache.rocketmq.streams.common.configurable.IConfigurable;
+import org.apache.rocketmq.streams.configurable.service.AbstractConfigurableService;
+import org.apache.rocketmq.streams.common.utils.ReflectUtil;
+
+public class MemoryConfigureService extends AbstractConfigurableService {
+
+    private static Map<String, List<IConfigurable>> namespace2Configure = new HashMap<>();
+
+    public MemoryConfigureService(Properties properties) {
+        super(properties);
+    }
+
+    @Override
+    protected GetConfigureResult loadConfigurable(String namespace) {
+        GetConfigureResult result = new GetConfigureResult();
+        result.setQuerySuccess(true);
+        List<IConfigurable> configurableList = new ArrayList<>();
+        List<IConfigurable> configurables = namespace2Configure.get(namespace);
+        if (configurables == null) {
+            configurableList = null;
+        } else {
+            List<IConfigurable> tmps = new ArrayList<>();
+            tmps.addAll(configurables);
+            for (IConfigurable configurable : tmps) {
+                IConfigurable tmp = ReflectUtil.forInstance(configurable.getClass());
+                tmp.toObject(configurable.toJson());
+                tmp.setNameSpace(configurable.getNameSpace());
+                tmp.setConfigureName(configurable.getConfigureName());
+                configurableList.add(tmp);
+            }
+        }
+        result.setConfigurables(configurableList);
+        return result;
+    }
+
+    @Override
+    protected void insertConfigurable(IConfigurable configurable) {
+        if (configurable == null) {
+            return;
+        }
+
+        String namespace = configurable.getNameSpace();
+        List<IConfigurable> list = namespace2Configure.get(namespace);
+        if (list == null) {
+            synchronized (this) {
+                list = namespace2Configure.get(namespace);
+                if (list == null) {
+                    list = new ArrayList<>();
+                    namespace2Configure.put(namespace, list);
+                }
+            }
+        }
+        int removeIndex = -1;
+        for (int i = 0; i < list.size(); i++) {
+            IConfigurable config = list.get(i);
+            if (config.getType().equals(configurable.getType()) && config.getConfigureName()
+                .equals(configurable.getConfigureName())) {
+                removeIndex = i;
+            }
+        }
+        if (AbstractConfigurable.class.isInstance(configurable)) {
+            ((AbstractConfigurable)configurable).setConfigurableService(this);
+        }
+        if (removeIndex != -1) {
+            list.remove(removeIndex);
+        }
+        list.add(configurable);
+    }
+
+    @Override
+    protected void updateConfigurable(IConfigurable configure) {
+        List<IConfigurable> list = namespace2Configure.get(configure.getNameSpace());
+        if (list == null || list.size() == 0) {
+            throw new RuntimeException(
+                "not have exist configure " + configure.getNameSpace() + "," + configure.getType() + ","
+                    + configure.getConfigureName());
+        }
+        for (int i = 0; i < list.size(); i++) {
+            IConfigurable config = list.get(i);
+            if (config.getType().equals(configure.getType()) && config.getConfigureName()
+                .equals(configure.getConfigureName())) {
+                list.set(i, configure);
+                return;
+            }
+        }
+        throw new RuntimeException(
+            "not have exist configure " + configure.getNameSpace() + "," + configure.getType() + ","
+                + configure.getConfigureName());
+    }
+
+    @Override
+    public <T extends IConfigurable> List<T> loadConfigurableFromStorage(String type) {
+        refreshConfigurable(getNamespace());
+        return queryConfigurableByType(type);
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemorySupportParentConfigureService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemorySupportParentConfigureService.java
new file mode 100644
index 0000000..92d2095
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemorySupportParentConfigureService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.service.impl;
+
+import java.util.Properties;
+
+import org.apache.rocketmq.streams.configurable.service.AbstractSupportParentConfigureService;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configurable.service.ConfigurableServcieType;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import com.google.auto.service.AutoService;
+
+@AutoService(IConfigurableService.class)
+@ServiceName(ConfigurableServcieType.MEMORY_SERVICE_NAME)
+public class MemorySupportParentConfigureService extends AbstractSupportParentConfigureService {
+
+    @Override
+    protected void initBeforeInitConfigurable(Properties property) {
+        //        this.rootConfigureService = new MemoryConfigureService(property);
+        this.parentConfigureService = new MemoryConfigureService(properties);
+        this.configureService = new MemoryConfigureService(properties);
+    }
+}
diff --git a/rocketmq-streams-configurable/src/main/resources/log4j.xml b/rocketmq-streams-configurable/src/main/resources/log4j.xml
new file mode 100755
index 0000000..7812fe7
--- /dev/null
+++ b/rocketmq-streams-configurable/src/main/resources/log4j.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "http://toolkit.alibaba-inc.com/dtd/log4j/log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+    <appender name="Console" class="org.apache.log4j.ConsoleAppender">
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d{ISO8601} %l [%t] %-5p - %m%n%n"/>
+        </layout>
+        <filter class="org.apache.log4j.varia.LevelRangeFilter">
+            <param name="LevelMin" value="INFO"/>
+            <param name="LevelMax" value="ERROR"/>
+        </filter>
+    </appender>
+
+    <root>
+        <priority value="INFO"/>
+        <appender-ref ref="Console"/>
+    </root>
+
+</log4j:configuration>
\ No newline at end of file
diff --git a/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configuable/ConfiguableComponentTest.java b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configuable/ConfiguableComponentTest.java
new file mode 100644
index 0000000..50cc83b
--- /dev/null
+++ b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configuable/ConfiguableComponentTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable;
+
+import java.util.List;
+
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configuable.model.Person;
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertTrue;
+
+public class ConfiguableComponentTest {
+
+    @Test
+    public void testInsertConfiguable(){
+        String namespace="org.apache.configuable.test";
+        ConfigurableComponent configurableComponent= ConfigurableComponent.getInstance(namespace);
+        Person person=createPerson(namespace);
+        configurableComponent.insert(person);//完成数据存储,在配置文件配置存储类型,支持内存,db和文件,默认是内存
+        //查询只操作内存,存储的数据定时加载到内存,刚插入的数据,还未加载,查询不到
+        assertTrue(configurableComponent.queryConfigurable("person","personName")==null);
+        configurableComponent.refreshConfigurable(namespace);//强制加载数据到内存,可以查询数据
+        assertTrue(configurableComponent.queryConfigurable("person","peronName")!=null);
+    }
+
+
+    @Test
+    public void testConfiguableENVDependence(){
+        String namespace="org.apache.configuable.test";
+        ConfigurableComponent configurableComponent= ConfigurableComponent.getInstance(namespace);
+        Person person=createPerson(namespace);
+        person.setName("persion.name");//对于有ENVDependence的字段,可以不存储真值,存储一个key,把真值配置在配置文件中
+        configurableComponent.insert(person);//完成数据存储,在配置文件配置存储类型,支持内存,db和文件,默认是内存
+        ComponentCreator.getProperties().put("persion.name","realName");//这个代表真实的配置文件,启动时会把配置文件的内容加载到ComponentCreator.getProperties()中
+        configurableComponent.refreshConfigurable(namespace);//刷新存储
+        person=configurableComponent.queryConfigurable("person","peronName");
+        assertTrue(person.getName().equals("realName"));
+    }
+
+
+    @Test
+    public void testSupportParentNameSpace(){
+        String namespace="org.apache.configuable.test";
+        ConfigurableComponent configurableComponent= ConfigurableComponent.getInstance(namespace);
+        Person person=createPerson(namespace);
+        Person otherPerson=createPerson("org.apache.configuable.test1");
+        configurableComponent.insert(person);
+        configurableComponent.insert(otherPerson);
+        configurableComponent.refreshConfigurable(namespace);
+        //只加载自己命名空间的对象
+        List<Person> personList=configurableComponent.queryConfigurableByType("person");
+        assertTrue(personList.size()==1);
+
+        /**
+         * 顶级命名空间的对象,所有namespace都可见
+         */
+        Person thirdPerson=createPerson(IConfigurableService.PARENT_CHANNEL_NAME_SPACE);
+        configurableComponent.insert(thirdPerson);
+        configurableComponent.refreshConfigurable(namespace);//只加载自己命名空间的对象
+        personList=configurableComponent.queryConfigurableByType("person");
+        assertTrue(personList.size()==2);
+    }
+
+
+    //测试定时加载逻辑,当对象的updateFlag值变化后,才会被替换旧对象
+    @Test
+    public void testAutoLoader() throws InterruptedException {
+        ComponentCreator.getProperties().put(AbstractComponent.POLLING_TIME,"1");//1秒后动态加载对象
+        String namespace="org.apache.configuable.test";
+        ConfigurableComponent configurableComponent= ConfigurableComponent.getInstance(namespace);
+        Person person=createPerson(namespace);
+        configurableComponent.insert(person);
+        Thread.sleep(2000);//1秒后,新插入的对象会被加载
+        person=configurableComponent.queryConfigurable("person","peronName");
+        assertTrue(person!=null);
+
+
+    }
+
+    /**
+     * 创建configuable对象
+     * @param namespace
+     * @return
+     */
+    protected Person createPerson(String namespace){
+        Person person=new Person();
+        person.setName("chris");
+        person.setAge(18);
+        person.setNameSpace(namespace);
+        person.setConfigureName("peronName");
+        person.setType("person");
+        return person;
+    }
+}
diff --git a/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java
new file mode 100644
index 0000000..8c4f076
--- /dev/null
+++ b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configuable/model/Person.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.streams.configuable.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+public class Person extends BasedConfigurable{
+    @ENVDependence
+    private String name;
+    private int age;
+    private Boolean isMale;
+    private List<String> addresses;
+    private Map<String, Integer> childName2Age;
+
+    public static Person createPerson(String namespace) {
+        Person person = new Person();
+        person.setNameSpace(namespace);
+        person.setType("person");
+        person.setConfigureName("Chris");
+        person.setName("Chris");
+        List<String> addresses = new ArrayList<>();
+        addresses.add("huilongguan");
+        addresses.add("shangdi");
+        person.setAddresses(addresses);
+        Map<String, Integer> childName2Age = new HashMap<>();
+        childName2Age.put("yuanyahan", 8);
+        childName2Age.put("yuanruxi", 4);
+        person.setChildName2Age(childName2Age);
+        person.setMale(true);
+        person.setAge(18);
+        return person;
+    }
+
+    @Override
+    public String toString() {
+        return "Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
+                + ", childName2Age=" + childName2Age + '}';
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public int getAge() {
+        return age;
+    }
+
+    public void setAge(int age) {
+        this.age = age;
+    }
+
+    public Boolean getMale() {
+        return isMale;
+    }
+
+    public void setMale(Boolean male) {
+        isMale = male;
+    }
+
+    public List<String> getAddresses() {
+        return addresses;
+    }
+
+    public void setAddresses(List<String> addresses) {
+        this.addresses = addresses;
+    }
+
+    public Map<String, Integer> getChildName2Age() {
+        return childName2Age;
+    }
+
+    public void setChildName2Age(Map<String, Integer> childName2Age) {
+        this.childName2Age = childName2Age;
+    }
+
+    @Override
+    public Object clone() {
+        Person person = null;
+        try {
+            person = (Person)super.clone();
+        } catch (CloneNotSupportedException e) {
+            System.out.println("clone error " + e);
+        }
+        return person;
+    }
+}
diff --git a/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configurable/ConfigurableComponentTest.java b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configurable/ConfigurableComponentTest.java
new file mode 100644
index 0000000..6e632b1
--- /dev/null
+++ b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configurable/ConfigurableComponentTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable;
+
+import java.util.List;
+
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
+import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
+import org.apache.rocketmq.streams.configurable.model.Person;
+import org.junit.Test;
+
+import static junit.framework.TestCase.assertTrue;
+
+public class ConfigurableComponentTest {
+
+    @Test
+    public void testInsertConfigurable() {
+        String namespace = "org.apache.configurable.test";
+        ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace);
+        Person person = createPerson(namespace);
+        configurableComponent.insert(person);//完成数据存储,在配置文件配置存储类型,支持内存,db和文件,默认是内存
+        //查询只操作内存,存储的数据定时加载到内存,刚插入的数据,还未加载,查询不到
+        assertTrue(configurableComponent.queryConfigurable("person", "personName") == null);
+        configurableComponent.refreshConfigurable(namespace);//强制加载数据到内存,可以查询数据
+        assertTrue(configurableComponent.queryConfigurable("person", "peronName") != null);
+    }
+
+    @Test
+    public void testConfigurableENVDependence() {
+        String namespace = "org.apache.configurable.test";
+        ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace);
+        Person person = createPerson(namespace);
+        person.setName("persion.name");//对于有ENVDependence的字段,可以不存储真值,存储一个key,把真值配置在配置文件中
+        configurableComponent.insert(person);//完成数据存储,在配置文件配置存储类型,支持内存,db和文件,默认是内存
+        ComponentCreator.getProperties().put("persion.name", "realName");//这个代表真实的配置文件,启动时会把配置文件的内容加载到ComponentCreator.getProperties()中
+        configurableComponent.refreshConfigurable(namespace);//刷新存储
+        person = configurableComponent.queryConfigurable("person", "peronName");
+        assertTrue(person.getName().equals("realName"));
+    }
+
+    @Test
+    public void testSupportParentNameSpace() {
+        String namespace = "org.apache.configurable.test";
+        ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace);
+        Person person = createPerson(namespace);
+        Person otherPerson = createPerson("org.apache.configuable.test1");
+        configurableComponent.insert(person);
+        configurableComponent.insert(otherPerson);
+        configurableComponent.refreshConfigurable(namespace);
+        //只加载自己命名空间的对象
+        List<Person> personList = configurableComponent.queryConfigurableByType("person");
+        assertTrue(personList.size() == 1);
+
+        /**
+         * 顶级命名空间的对象,所有namespace都可见
+         */
+        Person thirdPerson = createPerson(IConfigurableService.PARENT_CHANNEL_NAME_SPACE);
+        configurableComponent.insert(thirdPerson);
+        configurableComponent.refreshConfigurable(namespace);//只加载自己命名空间的对象
+        personList = configurableComponent.queryConfigurableByType("person");
+        assertTrue(personList.size() == 2);
+    }
+
+    //测试定时加载逻辑,当对象的updateFlag值变化后,才会被替换旧对象
+    @Test
+    public void testAutoLoader() throws InterruptedException {
+        ComponentCreator.getProperties().put(AbstractComponent.POLLING_TIME, "1");//1秒后动态加载对象
+        String namespace = "org.apache.configurable.test";
+        ConfigurableComponent configurableComponent = ConfigurableComponent.getInstance(namespace);
+        Person person = createPerson(namespace);
+        configurableComponent.insert(person);
+        Thread.sleep(2000);//1秒后,新插入的对象会被加载
+        person = configurableComponent.queryConfigurable("person", "peronName");
+        assertTrue(person != null);
+
+    }
+
+    /**
+     * 创建configurable对象
+     *
+     * @param namespace
+     * @return
+     */
+    protected Person createPerson(String namespace) {
+        Person person = new Person();
+        person.setName("chris");
+        person.setAge(18);
+        person.setNameSpace(namespace);
+        person.setConfigureName("peronName");
+        person.setType("person");
+        return person;
+    }
+}
diff --git a/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java
new file mode 100644
index 0000000..06fd678
--- /dev/null
+++ b/rocketmq-streams-configurable/src/test/java/org/apache/rocketmq/streams/configurable/model/Person.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.configurable.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
+
+public class Person extends BasedConfigurable {
+    @ENVDependence
+    private String name;
+    private int age;
+    private Boolean isMale;
+    private List<String> addresses;
+    private Map<String, Integer> childName2Age;
+
+    public static Person createPerson(String namespace) {
+        Person person = new Person();
+        person.setNameSpace(namespace);
+        person.setType("person");
+        person.setConfigureName("Chris");
+        person.setName("Chris");
+        List<String> addresses = new ArrayList<>();
+        addresses.add("huilongguan");
+        addresses.add("shangdi");
+        person.setAddresses(addresses);
+        Map<String, Integer> childName2Age = new HashMap<>();
+        childName2Age.put("yuanyahan", 8);
+        childName2Age.put("yuanruxi", 4);
+        person.setChildName2Age(childName2Age);
+        person.setMale(true);
+        person.setAge(18);
+        return person;
+    }
+
+    @Override
+    public String toString() {
+        return "Person{" + "name='" + name + '\'' + ", age=" + age + ", isMale=" + isMale + ", addresses=" + addresses
+            + ", childName2Age=" + childName2Age + '}';
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public int getAge() {
+        return age;
+    }
+
+    public void setAge(int age) {
+        this.age = age;
+    }
+
+    public Boolean getMale() {
+        return isMale;
+    }
+
+    public void setMale(Boolean male) {
+        isMale = male;
+    }
+
+    public List<String> getAddresses() {
+        return addresses;
+    }
+
+    public void setAddresses(List<String> addresses) {
+        this.addresses = addresses;
+    }
+
+    public Map<String, Integer> getChildName2Age() {
+        return childName2Age;
+    }
+
+    public void setChildName2Age(Map<String, Integer> childName2Age) {
+        this.childName2Age = childName2Age;
+    }
+
+    @Override
+    public Object clone() {
+        Person person = null;
+        try {
+            person = (Person)super.clone();
+        } catch (CloneNotSupportedException e) {
+            System.out.println("clone error " + e);
+        }
+        return person;
+    }
+}
diff --git a/rocketmq-streams-configurable/src/test/resources/log4j.xml b/rocketmq-streams-configurable/src/test/resources/log4j.xml
new file mode 100755
index 0000000..7812fe7
--- /dev/null
+++ b/rocketmq-streams-configurable/src/test/resources/log4j.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "http://toolkit.alibaba-inc.com/dtd/log4j/log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+    <appender name="Console" class="org.apache.log4j.ConsoleAppender">
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d{ISO8601} %l [%t] %-5p - %m%n%n"/>
+        </layout>
+        <filter class="org.apache.log4j.varia.LevelRangeFilter">
+            <param name="LevelMin" value="INFO"/>
+            <param name="LevelMax" value="ERROR"/>
+        </filter>
+    </appender>
+
+    <root>
+        <priority value="INFO"/>
+        <appender-ref ref="Console"/>
+    </root>
+
+</log4j:configuration>
\ No newline at end of file