You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/03/11 07:54:47 UTC

[rocketmq-streams] branch main updated: [ISSUES 135] App can not start up with RocketMQ-streams (#136)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new fe3cc4e  [ISSUES 135] App can not start up with RocketMQ-streams  (#136)
fe3cc4e is described below

commit fe3cc4e2015aeeb7b5ecc784828a27bfd403dae3
Author: Ni Ze <un...@gmail.com>
AuthorDate: Fri Mar 11 15:54:42 2022 +0800

    [ISSUES 135] App can not start up with RocketMQ-streams  (#136)
    
    * fix(jar) make rocketmq-streams runnable as a sdk
    
    * fix(example)make example runnable
---
 rocketmq-streams-channel-rocketmq/pom.xml          |   4 -
 .../common/channel/impl/CollectionSink.java        |  55 -----
 .../common/channel/impl/CollectionSinkBuilder.java |  50 -----
 .../common/channel/impl/PrintChannelBuilder.java   |  46 ----
 .../channel/impl/file/FileChannelBuilder.java      |  94 --------
 .../channel/impl/memory/MemoryChannelBuilder.java  |  64 ------
 .../channel/impl/mutiltask/MutilSinkBuilder.java   |  67 ------
 .../channel/impl/mutiltask/MutilTaskSink.java      |  87 -------
 .../impl/transit/TransitChannelBuilder.java        |  69 ------
 .../common/channel/impl/transit/TransitSink.java   | 250 ---------------------
 .../common/channel/impl/transit/TransitSource.java |  66 ------
 .../db/configuable}/FileConfigureService.java      |   2 +-
 .../FileSupportParentConfigureService.java         |   2 +-
 .../db/configuable}/MemoryConfigureService.java    |   2 +-
 .../MemorySupportParentConfigureService.java       |   2 +-
 .../checkpoint/RemoteCheckpointExample.java        |   4 +-
 .../mutilconsumer/MultiStreamsExample.java         |   4 +-
 17 files changed, 8 insertions(+), 860 deletions(-)

diff --git a/rocketmq-streams-channel-rocketmq/pom.xml b/rocketmq-streams-channel-rocketmq/pom.xml
index cad1c1b..d99a574 100644
--- a/rocketmq-streams-channel-rocketmq/pom.xml
+++ b/rocketmq-streams-channel-rocketmq/pom.xml
@@ -31,10 +31,6 @@
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-streams-commons</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-streams-serviceloader</artifactId>
-        </dependency>
 
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java
deleted file mode 100644
index d3431cc..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
-import org.apache.rocketmq.streams.common.context.IMessage;
-
-/**
- * @description just support json object
- */
-public class CollectionSink extends AbstractSink {
-
-    List<Object> data;
-
-    public CollectionSink() {
-        data = new ArrayList<>();
-    }
-
-    public CollectionSink(List<Object> data) {
-        this.data = data;
-    }
-
-    @Override
-    protected synchronized boolean batchInsert(List<IMessage> messages) {
-        for (IMessage msg : messages) {
-            if (msg.isJsonMessage()) {
-                data.add(msg.getMessageBody().toJSONString());
-            } else {
-                data.add(msg.getMessageValue());
-            }
-        }
-
-        return false;
-    }
-
-    public List<Object> getData() {
-        return data;
-    }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java
deleted file mode 100644
index f2d67a0..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl;
-
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-
-/**
- * @description
- */
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = CollectionSinkBuilder.TYPE)
-public class CollectionSinkBuilder implements IChannelBuilder {
-
-    public static final String TYPE = "collection";
-
-    @Override
-    public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
-        return new CollectionSource();
-    }
-
-    @Override
-    public String getType() {
-        return TYPE;
-    }
-
-    @Override
-    public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
-        return new CollectionSink();
-    }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/PrintChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/PrintChannelBuilder.java
deleted file mode 100644
index a48e930..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/PrintChannelBuilder.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl;
-
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = PrintChannelBuilder.TYPE, aliasName = "OutputPrintChannel")
-public class PrintChannelBuilder implements IChannelBuilder {
-    public static final String TYPE = "print";
-
-    @Override
-    public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
-        throw new RuntimeException("can not support this method");
-    }
-
-    @Override
-    public String getType() {
-        return TYPE;
-    }
-
-    @Override
-    public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
-        return new OutputPrintChannel();
-    }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileChannelBuilder.java
deleted file mode 100644
index eb84689..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileChannelBuilder.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl.file;
-
-import com.alibaba.fastjson.JSONObject;
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.impl.memory.MemoryChannel;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
-
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = FileChannelBuilder.TYPE, aliasName = "FileSource")
-public class FileChannelBuilder extends AbstractSupportShuffleChannelBuilder {
-
-    public static final String TYPE = "file";
-
-    /**
-     * @param namespace
-     * @param name
-     * @param properties
-     * @return
-     */
-    @Override
-    public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
-
-        FileSource fileSource = (FileSource) ConfigurableUtil.create(FileSource.class.getName(), namespace, name, createFormatProperty(properties), null);
-        return fileSource;
-    }
-
-    @Override
-    public String getType() {
-        return TYPE;
-    }
-
-    @Override
-    public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
-        FileSink fileSink = (FileSink) ConfigurableUtil.create(FileSink.class.getName(), namespace, name, createFormatProperty(properties), null);
-        return fileSink;
-    }
-
-    /**
-     * 创建标准的属性文件
-     *
-     * @param properties
-     * @return
-     */
-    @Override
-    protected JSONObject createFormatProperty(Properties properties) {
-        JSONObject formatProperties = new JSONObject();
-        for (Object object : properties.keySet()) {
-            String key = (String) object;
-            if ("type".equals(key)) {
-                continue;
-            }
-            formatProperties.put(key, properties.get(key));
-        }
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "filePath", "filePath");
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "filePath", "filepath");
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "filePath", "fileName");
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "maxThread", "thread.max.count");
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "maxThread", "maxthread");
-        return formatProperties;
-    }
-
-    @Override
-    public ISource copy(ISource pipelineSource) {
-        return new MemoryChannel();
-    }
-
-    @Override
-    public ISink createBySource(ISource pipelineSource) {
-        return new MemoryChannel();
-    }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannelBuilder.java
deleted file mode 100644
index 720e3e1..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannelBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl.memory;
-
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = MemoryChannelBuilder.TYPE, aliasName = "MemorySource")
-public class MemoryChannelBuilder extends AbstractSupportShuffleChannelBuilder {
-
-    public static final String TYPE = "memory";
-
-    /**
-     * @param namespace
-     * @param name
-     * @param properties
-     * @return
-     */
-    @Override
-    public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
-        return new MemorySource();
-    }
-
-    @Override
-    public String getType() {
-        return TYPE;
-    }
-
-    @Override
-    public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
-        return new MemorySink();
-    }
-
-    @Override
-    public ISource copy(ISource pipelineSource) {
-        return new MemoryChannel();
-    }
-
-    @Override
-    public ISink createBySource(ISource pipelineSource) {
-        return new MemoryChannel();
-    }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilSinkBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilSinkBuilder.java
deleted file mode 100644
index 428a619..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilSinkBuilder.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl.mutiltask;
-
-import com.alibaba.fastjson.JSONObject;
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = MutilSinkBuilder.SINK_TYPE, aliasName = "tasks",name="MutilTaskSink")
-public class MutilSinkBuilder implements IChannelBuilder {
-    public static final String SINK_TYPE="mutil_task";
-    @Override public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
-        throw new RuntimeException("can not support this method");
-    }
-
-
-
-    @Override public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
-        MutilTaskSink sink = (MutilTaskSink) ConfigurableUtil.create(MutilTaskSink.class.getName(), namespace, name, createFormatProperty(properties), null);
-        return sink;
-    }
-
-
-
-    protected JSONObject createFormatProperty(Properties properties) {
-        JSONObject formatProperties = new JSONObject();
-        for (Object object : properties.keySet()) {
-            String key = (String) object;
-            if ("type".equals(key)) {
-                continue;
-            }
-            formatProperties.put(key, properties.getProperty(key));
-        }
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "configureName", "taskName");
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "configureName", "name");
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "configureName", "task_name");
-        return formatProperties;
-    }
-
-
-    @Override
-    public String getType() {
-       return MutilSinkBuilder.SINK_TYPE;
-    }
-
-
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java
deleted file mode 100644
index 5ad76de..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl.mutiltask;
-
-import java.util.List;
-import java.util.Set;
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
-import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
-import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
-import org.apache.rocketmq.streams.common.context.Context;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.topology.task.StreamsTask;
-
-public class MutilTaskSink extends AbstractSink implements IAfterConfigurableRefreshListener {
-
-    protected transient StreamsTask streamsTask;
-    protected String taskName;
-
-    @Override public boolean batchAdd(IMessage message) {
-        Context context = new Context(message);
-        streamsTask.doMessage(message, context);
-        return true;
-    }
-
-    @Override public boolean batchSave(List<IMessage> messages) {
-        if (messages != null) {
-            for (IMessage message : messages) {
-                batchAdd(message);
-            }
-        }
-        return true;
-    }
-
-    @Override public boolean flush(Set<String> splitIds) {
-        return true;
-    }
-
-    @Override public boolean flush(String... splitIds) {
-        return true;
-    }
-
-    @Override protected boolean batchInsert(List<IMessage> messages) {
-        throw new RuntimeException("can not support this method");
-    }
-
-    @Override public void closeAutoFlush() {
-
-    }
-
-    @Override public boolean flush() {
-        return true;
-    }
-
-    @Override protected boolean initConfigurable() {
-        return true;
-    }
-
-    @Override public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
-        streamsTask = new StreamsTask();
-        streamsTask.setConfigureName(taskName);
-        streamsTask.setNameSpace(getNameSpace());
-        streamsTask.init();
-        streamsTask.doProcessAfterRefreshConfigurable(configurableService);
-    }
-
-    public String getTaskName() {
-        return taskName;
-    }
-
-    public void setTaskName(String taskName) {
-        this.taskName = taskName;
-    }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java
deleted file mode 100644
index 96dee19..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.streams.common.channel.impl.transit;
-
-import com.alibaba.fastjson.JSONObject;
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
-
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = TransitChannelBuilder.TYPE, aliasName = "cache_table")
-public class TransitChannelBuilder implements IChannelBuilder {
-    public static final String TYPE = "transit";
-
-    @Override public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
-        return (TransitSource) ConfigurableUtil.create(TransitSource.class.getName(), namespace, name, createFormatProperty(properties), null);
-    }
-
-    @Override public String getType() {
-        return TYPE;
-    }
-
-    @Override public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
-        return (TransitSink) ConfigurableUtil.create(TransitSink.class.getName(), namespace, name, createFormatProperty(properties), null);
-    }
-
-    /**
-     * 创建标准的属性文件
-     *
-     * @param properties
-     * @return
-     */
-    protected JSONObject createFormatProperty(Properties properties) {
-        JSONObject formatProperties = new JSONObject();
-        for (Object object : properties.keySet()) {
-            String key = (String) object;
-            if ("type".equals(key)) {
-                continue;
-            }
-            formatProperties.put(key, properties.getProperty(key));
-        }
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "logFingerprintFieldNames", "filterFieldNames");
-
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "logFingerprintFieldNames", "filterfieldnames");
-        IChannelBuilder.formatPropertiesName(formatProperties, properties, "tableName", "sql_create_table_name");
-//        IChannelBuilder.formatPropertiesName(formatProperties, properties, "maxThread", "thread.max.count");
-        return formatProperties;
-    }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitSink.java
deleted file mode 100644
index d48526a..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitSink.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.streams.common.channel.impl.transit;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.cache.compress.BitSetCache;
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
-import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
-import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
-import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.context.Context;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.optimization.MessageGlobleTrace;
-import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintCache;
-import org.apache.rocketmq.streams.common.topology.ChainPipeline;
-import org.apache.rocketmq.streams.common.topology.model.Pipeline;
-import org.apache.rocketmq.streams.common.topology.model.PipelineSourceJoiner;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-
-public class TransitSink extends AbstractSink  implements IAfterConfigurableRefreshListener {
-    private static final Log LOG = LogFactory.getLog(TransitSink.class);
-    protected transient List<ChainPipeline> piplines = new ArrayList<>();
-    protected String tableName;
-    @ENVDependence
-    protected String logFingerprintFieldNames;//config log finger
-
-    @Override public boolean batchAdd(IMessage message) {
-        boolean onlyOne = piplines.size() == 1;
-        int index = 0;
-        //可以启动重复过滤,把日志中的必须字段抽取出来,做去重,如果某个日志,在某个pipline执行不成功,下次类似日志过来,直接过滤掉
-        BitSetCache.BitSet bitSet = getFilterValue(message);
-        if(bitSet==null&&logFingerprintFieldNames!=null){
-            bitSet=new BitSetCache.BitSet(piplines.size());
-        }
-        for (ChainPipeline pipline : piplines) {
-            if (bitSet!=null&&bitSet.get(index)) {
-                continue;
-            }
-
-            IMessage copyMessage = message;
-            if (!onlyOne) {
-                copyMessage = message.deepCopy();
-            }
-            Context newContext = new Context(copyMessage);
-            try {
-
-                pipline.doMessage(copyMessage, newContext);
-                if (!MessageGlobleTrace.existFinishBranch(copyMessage)) {
-                    if(bitSet!=null){
-                        bitSet.set(index);
-                        addNoFireMessage(message, bitSet);
-                    }
-
-                }
-
-            } catch (Exception e) {
-                LOG.error("pipline execute error " + pipline.getConfigureName(), e);
-            }
-            index++;
-
-        }
-        return true;
-    }
-
-
-
-    @Override public boolean checkpoint(Set<String> splitIds) {
-        return super.checkpoint(splitIds);
-    }
-
-    @Override public boolean checkpoint(String... splitIds) {
-        return super.checkpoint(splitIds);
-    }
-
-    @Override protected boolean batchInsert(List<IMessage> messages) {
-        return false;
-    }
-
-    @Override protected boolean initConfigurable() {
-        boolean success= super.initConfigurable();
-        this.messageCache=null;
-        return true;
-    }
-
-    @Override public boolean flush(Set<String> splitIds) {
-        return true;
-    }
-
-    @Override public boolean flush(String... splitIds) {
-        return true;
-    }
-
-    @Override public boolean flush() {
-        return true;
-    }
-
-    @Override public void openAutoFlush() {
-    }
-
-    @Override public void closeAutoFlush() {
-    }
-
-    @Override public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
-        List<ChainPipeline> piplines = new ArrayList<>();
-
-        loadSubPiplines(piplines, configurableService);//通过PiplineSourceJoiner装载子pipline
-
-
-        /**
-         * 做排序,确保pipline对应的index和messageRepeateFileter 的一致
-         */
-        Collections.sort(piplines, new Comparator<ChainPipeline>() {
-            @Override
-            public int compare(ChainPipeline o1, ChainPipeline o2) {
-                return o1.getConfigureName().compareTo(o2.getConfigureName());
-            }
-        });
-        if (!equalsPiplines(this.piplines, piplines)) {
-            this.piplines = piplines;
-
-        }
-    }
-
-    /**
-     * 如果确定这个message,在某个pipline不触发,则记录下来,下次直接跳过,不执行
-     *
-     * @param message
-     * @param message
-     */
-    protected void addNoFireMessage(IMessage message, BitSetCache.BitSet bitSet) {
-        if (this.logFingerprintFieldNames == null) {
-            return;
-        }
-
-        FingerprintCache.getInstance().addLogFingerprint(getOrCreateFingerNameSpace(),message,bitSet,this.logFingerprintFieldNames);
-    }
-
-
-    /**
-     * 判读是否可以针对这条数据,过滤掉这个pipline
-     *
-     * @param message
-     * @return
-     */
-    protected BitSetCache.BitSet getFilterValue(IMessage message) {
-        if (this.logFingerprintFieldNames == null) {
-            return null;
-        }
-
-        return FingerprintCache.getInstance().getLogFingerprint(getOrCreateFingerNameSpace(),message,this.logFingerprintFieldNames);
-    }
-
-    /**
-     * 判读是否可以针对这条数据,过滤掉这个pipline
-     *
-     * @param msgKey
-     * @return
-     */
-    protected transient String fingerNameSpace;
-
-    protected String getOrCreateFingerNameSpace(){
-        if(fingerNameSpace==null){
-            fingerNameSpace= MapKeyUtil.createKey(this.getNameSpace(),this.getConfigureName());
-        }
-        return this.fingerNameSpace;
-    }
-
-    /**
-     * 动态装配子pipline
-     *
-     * @param piplines
-     * @param configurableService
-     */
-    protected void loadSubPiplines(List<ChainPipeline> piplines, IConfigurableService configurableService) {
-        List<PipelineSourceJoiner> joiners = configurableService.queryConfigurableByType(PipelineSourceJoiner.TYPE);
-        if (joiners == null) {
-            return;
-        }
-        for (PipelineSourceJoiner joiner : joiners) {
-            if (tableName.equals(joiner.getSourcePipelineName())) {
-                ChainPipeline pipline = configurableService.queryConfigurable(Pipeline.TYPE, joiner.getPipelineName());
-                if (pipline != null) {
-                    piplines.add(pipline);
-                }
-            }
-        }
-    }
-
-    /**
-     * pipline有没有发生过变化
-     *
-     * @param piplines
-     * @param piplines1
-     * @return
-     */
-    private boolean equalsPiplines(List<ChainPipeline> piplines, List<ChainPipeline> piplines1) {
-        if (piplines1 == null || piplines1.size() == 0) {
-            return false;
-        }
-        if (piplines == null || piplines.size() == 0) {
-            return false;
-        }
-        if (piplines.size() != piplines1.size()) {
-            return false;
-        }
-        for (ChainPipeline pipline : piplines) {
-            if (!piplines1.contains(pipline)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    public String getTableName() {
-        return tableName;
-    }
-
-    public void setTableName(String tableName) {
-        this.tableName = tableName;
-    }
-
-    public String getLogFingerprintFieldNames() {
-        return logFingerprintFieldNames;
-    }
-
-    public void setLogFingerprintFieldNames(String logFingerprintFieldNames) {
-        this.logFingerprintFieldNames = logFingerprintFieldNames;
-    }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitSource.java
deleted file mode 100644
index eeed886..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitSource.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.streams.common.channel.impl.transit;
-
-import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
-import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
-import org.apache.rocketmq.streams.common.topology.model.PipelineSourceJoiner;
-
-public class TransitSource extends AbstractSource {
-    protected String tableName;
-
-
-    @Override
-    public void addConfigurables(PipelineBuilder pipelineBuilder) {
-        PipelineSourceJoiner pipelineSourceJoiner=new PipelineSourceJoiner();
-        pipelineSourceJoiner.setSourcePipelineName(tableName);
-        pipelineSourceJoiner.setPipelineName(pipelineBuilder.getPipelineName());;
-        pipelineBuilder.addConfigurables(pipelineSourceJoiner);
-        pipelineBuilder.addConfigurables(this);
-    }
-
-
-
-    @Override protected boolean startSource() {
-        return true;
-    }
-
-    @Override public boolean supportNewSplitFind() {
-        return false;
-    }
-
-    @Override public boolean supportRemoveSplitFind() {
-        return false;
-    }
-
-    @Override public boolean supportOffsetRest() {
-        return false;
-    }
-
-    @Override protected boolean isNotDataSplit(String queueId) {
-        return false;
-    }
-
-    public String getTableName() {
-        return tableName;
-    }
-
-    public void setTableName(String tableName) {
-        this.tableName = tableName;
-    }
-}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/FileConfigureService.java
similarity index 99%
rename from rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
rename to rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/FileConfigureService.java
index 2d54a3e..518ed09 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/FileConfigureService.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.streams.configurable.service.impl;
+package org.apache.rocketmq.streams.db.configuable;
 
 import java.io.File;
 import java.util.ArrayList;
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileSupportParentConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/FileSupportParentConfigureService.java
similarity index 96%
rename from rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileSupportParentConfigureService.java
rename to rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/FileSupportParentConfigureService.java
index 0b77b77..7329a2f 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileSupportParentConfigureService.java
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/FileSupportParentConfigureService.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.streams.configurable.service.impl;
+package org.apache.rocketmq.streams.db.configuable;
 
 import com.google.auto.service.AutoService;
 import java.util.Properties;
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemoryConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/MemoryConfigureService.java
similarity index 98%
rename from rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemoryConfigureService.java
rename to rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/MemoryConfigureService.java
index 3de2e4d..65a58de 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemoryConfigureService.java
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/MemoryConfigureService.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.streams.configurable.service.impl;
+package org.apache.rocketmq.streams.db.configuable;
 
 import java.util.ArrayList;
 import java.util.HashMap;
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemorySupportParentConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/MemorySupportParentConfigureService.java
similarity index 96%
rename from rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemorySupportParentConfigureService.java
rename to rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/MemorySupportParentConfigureService.java
index 0c8765b..6b777cb 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemorySupportParentConfigureService.java
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/MemorySupportParentConfigureService.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.streams.configurable.service.impl;
+package org.apache.rocketmq.streams.db.configuable;
 
 import com.google.auto.service.AutoService;
 import java.util.Properties;
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointExample.java
index 8f81a43..6c931b8 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointExample.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointExample.java
@@ -75,9 +75,9 @@ public class RemoteCheckpointExample {
                         JSONObject.parseObject((String) message);
                     } catch (Throwable t) {
                         // if can not convert to json, discard it.because all operator are base on json.
-                        return true;
+                        return false;
                     }
-                    return false;
+                    return true;
                 })
                 //must convert message to json.
                 .map(message -> JSONObject.parseObject((String) message))
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java
index 2b823e4..4ab5bca 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java
@@ -71,9 +71,9 @@ public class MultiStreamsExample {
                     JSONObject.parseObject((String) message);
                 } catch (Throwable t) {
                     // if can not convert to json, discard it.because all operator are base on json.
-                    return true;
+                    return false;
                 }
-                return false;
+                return true;
             })
             //must convert message to json.
             .map(message -> JSONObject.parseObject((String) message))