You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/03/11 07:54:47 UTC
[rocketmq-streams] branch main updated: [ISSUES 135] App can not start up with RocketMQ-streams (#136)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new fe3cc4e [ISSUES 135] App can not start up with RocketMQ-streams (#136)
fe3cc4e is described below
commit fe3cc4e2015aeeb7b5ecc784828a27bfd403dae3
Author: Ni Ze <un...@gmail.com>
AuthorDate: Fri Mar 11 15:54:42 2022 +0800
[ISSUES 135] App can not start up with RocketMQ-streams (#136)
* fix(jar) make rocketmq-streams runnable as a sdk
* fix(example)make example runnable
---
rocketmq-streams-channel-rocketmq/pom.xml | 4 -
.../common/channel/impl/CollectionSink.java | 55 -----
.../common/channel/impl/CollectionSinkBuilder.java | 50 -----
.../common/channel/impl/PrintChannelBuilder.java | 46 ----
.../channel/impl/file/FileChannelBuilder.java | 94 --------
.../channel/impl/memory/MemoryChannelBuilder.java | 64 ------
.../channel/impl/mutiltask/MutilSinkBuilder.java | 67 ------
.../channel/impl/mutiltask/MutilTaskSink.java | 87 -------
.../impl/transit/TransitChannelBuilder.java | 69 ------
.../common/channel/impl/transit/TransitSink.java | 250 ---------------------
.../common/channel/impl/transit/TransitSource.java | 66 ------
.../db/configuable}/FileConfigureService.java | 2 +-
.../FileSupportParentConfigureService.java | 2 +-
.../db/configuable}/MemoryConfigureService.java | 2 +-
.../MemorySupportParentConfigureService.java | 2 +-
.../checkpoint/RemoteCheckpointExample.java | 4 +-
.../mutilconsumer/MultiStreamsExample.java | 4 +-
17 files changed, 8 insertions(+), 860 deletions(-)
diff --git a/rocketmq-streams-channel-rocketmq/pom.xml b/rocketmq-streams-channel-rocketmq/pom.xml
index cad1c1b..d99a574 100644
--- a/rocketmq-streams-channel-rocketmq/pom.xml
+++ b/rocketmq-streams-channel-rocketmq/pom.xml
@@ -31,10 +31,6 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams-commons</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-streams-serviceloader</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java
deleted file mode 100644
index d3431cc..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSink.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
-import org.apache.rocketmq.streams.common.context.IMessage;
-
-/**
- * @description just support json object
- */
-public class CollectionSink extends AbstractSink {
-
- List<Object> data;
-
- public CollectionSink() {
- data = new ArrayList<>();
- }
-
- public CollectionSink(List<Object> data) {
- this.data = data;
- }
-
- @Override
- protected synchronized boolean batchInsert(List<IMessage> messages) {
- for (IMessage msg : messages) {
- if (msg.isJsonMessage()) {
- data.add(msg.getMessageBody().toJSONString());
- } else {
- data.add(msg.getMessageValue());
- }
- }
-
- return false;
- }
-
- public List<Object> getData() {
- return data;
- }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java
deleted file mode 100644
index f2d67a0..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl;
-
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-
-/**
- * @description
- */
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = CollectionSinkBuilder.TYPE)
-public class CollectionSinkBuilder implements IChannelBuilder {
-
- public static final String TYPE = "collection";
-
- @Override
- public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
- return new CollectionSource();
- }
-
- @Override
- public String getType() {
- return TYPE;
- }
-
- @Override
- public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
- return new CollectionSink();
- }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/PrintChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/PrintChannelBuilder.java
deleted file mode 100644
index a48e930..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/PrintChannelBuilder.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl;
-
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = PrintChannelBuilder.TYPE, aliasName = "OutputPrintChannel")
-public class PrintChannelBuilder implements IChannelBuilder {
- public static final String TYPE = "print";
-
- @Override
- public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
- throw new RuntimeException("can not support this method");
- }
-
- @Override
- public String getType() {
- return TYPE;
- }
-
- @Override
- public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
- return new OutputPrintChannel();
- }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileChannelBuilder.java
deleted file mode 100644
index eb84689..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileChannelBuilder.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl.file;
-
-import com.alibaba.fastjson.JSONObject;
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.impl.memory.MemoryChannel;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
-
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = FileChannelBuilder.TYPE, aliasName = "FileSource")
-public class FileChannelBuilder extends AbstractSupportShuffleChannelBuilder {
-
- public static final String TYPE = "file";
-
- /**
- * @param namespace
- * @param name
- * @param properties
- * @return
- */
- @Override
- public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
-
- FileSource fileSource = (FileSource) ConfigurableUtil.create(FileSource.class.getName(), namespace, name, createFormatProperty(properties), null);
- return fileSource;
- }
-
- @Override
- public String getType() {
- return TYPE;
- }
-
- @Override
- public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
- FileSink fileSink = (FileSink) ConfigurableUtil.create(FileSink.class.getName(), namespace, name, createFormatProperty(properties), null);
- return fileSink;
- }
-
- /**
- * 创建标准的属性文件
- *
- * @param properties
- * @return
- */
- @Override
- protected JSONObject createFormatProperty(Properties properties) {
- JSONObject formatProperties = new JSONObject();
- for (Object object : properties.keySet()) {
- String key = (String) object;
- if ("type".equals(key)) {
- continue;
- }
- formatProperties.put(key, properties.get(key));
- }
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "filePath", "filePath");
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "filePath", "filepath");
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "filePath", "fileName");
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "maxThread", "thread.max.count");
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "maxThread", "maxthread");
- return formatProperties;
- }
-
- @Override
- public ISource copy(ISource pipelineSource) {
- return new MemoryChannel();
- }
-
- @Override
- public ISink createBySource(ISource pipelineSource) {
- return new MemoryChannel();
- }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannelBuilder.java
deleted file mode 100644
index 720e3e1..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannelBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl.memory;
-
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = MemoryChannelBuilder.TYPE, aliasName = "MemorySource")
-public class MemoryChannelBuilder extends AbstractSupportShuffleChannelBuilder {
-
- public static final String TYPE = "memory";
-
- /**
- * @param namespace
- * @param name
- * @param properties
- * @return
- */
- @Override
- public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
- return new MemorySource();
- }
-
- @Override
- public String getType() {
- return TYPE;
- }
-
- @Override
- public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
- return new MemorySink();
- }
-
- @Override
- public ISource copy(ISource pipelineSource) {
- return new MemoryChannel();
- }
-
- @Override
- public ISink createBySource(ISource pipelineSource) {
- return new MemoryChannel();
- }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilSinkBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilSinkBuilder.java
deleted file mode 100644
index 428a619..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilSinkBuilder.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl.mutiltask;
-
-import com.alibaba.fastjson.JSONObject;
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = MutilSinkBuilder.SINK_TYPE, aliasName = "tasks",name="MutilTaskSink")
-public class MutilSinkBuilder implements IChannelBuilder {
- public static final String SINK_TYPE="mutil_task";
- @Override public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
- throw new RuntimeException("can not support this method");
- }
-
-
-
- @Override public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
- MutilTaskSink sink = (MutilTaskSink) ConfigurableUtil.create(MutilTaskSink.class.getName(), namespace, name, createFormatProperty(properties), null);
- return sink;
- }
-
-
-
- protected JSONObject createFormatProperty(Properties properties) {
- JSONObject formatProperties = new JSONObject();
- for (Object object : properties.keySet()) {
- String key = (String) object;
- if ("type".equals(key)) {
- continue;
- }
- formatProperties.put(key, properties.getProperty(key));
- }
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "configureName", "taskName");
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "configureName", "name");
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "configureName", "task_name");
- return formatProperties;
- }
-
-
- @Override
- public String getType() {
- return MutilSinkBuilder.SINK_TYPE;
- }
-
-
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java
deleted file mode 100644
index 5ad76de..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/mutiltask/MutilTaskSink.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.common.channel.impl.mutiltask;
-
-import java.util.List;
-import java.util.Set;
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
-import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
-import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
-import org.apache.rocketmq.streams.common.context.Context;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.topology.task.StreamsTask;
-
-public class MutilTaskSink extends AbstractSink implements IAfterConfigurableRefreshListener {
-
- protected transient StreamsTask streamsTask;
- protected String taskName;
-
- @Override public boolean batchAdd(IMessage message) {
- Context context = new Context(message);
- streamsTask.doMessage(message, context);
- return true;
- }
-
- @Override public boolean batchSave(List<IMessage> messages) {
- if (messages != null) {
- for (IMessage message : messages) {
- batchAdd(message);
- }
- }
- return true;
- }
-
- @Override public boolean flush(Set<String> splitIds) {
- return true;
- }
-
- @Override public boolean flush(String... splitIds) {
- return true;
- }
-
- @Override protected boolean batchInsert(List<IMessage> messages) {
- throw new RuntimeException("can not support this method");
- }
-
- @Override public void closeAutoFlush() {
-
- }
-
- @Override public boolean flush() {
- return true;
- }
-
- @Override protected boolean initConfigurable() {
- return true;
- }
-
- @Override public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
- streamsTask = new StreamsTask();
- streamsTask.setConfigureName(taskName);
- streamsTask.setNameSpace(getNameSpace());
- streamsTask.init();
- streamsTask.doProcessAfterRefreshConfigurable(configurableService);
- }
-
- public String getTaskName() {
- return taskName;
- }
-
- public void setTaskName(String taskName) {
- this.taskName = taskName;
- }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java
deleted file mode 100644
index 96dee19..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitChannelBuilder.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.streams.common.channel.impl.transit;
-
-import com.alibaba.fastjson.JSONObject;
-import com.google.auto.service.AutoService;
-import java.util.Properties;
-import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
-import org.apache.rocketmq.streams.common.channel.sink.ISink;
-import org.apache.rocketmq.streams.common.channel.source.ISource;
-import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.model.ServiceName;
-import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
-
-@AutoService(IChannelBuilder.class)
-@ServiceName(value = TransitChannelBuilder.TYPE, aliasName = "cache_table")
-public class TransitChannelBuilder implements IChannelBuilder {
- public static final String TYPE = "transit";
-
- @Override public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
- return (TransitSource) ConfigurableUtil.create(TransitSource.class.getName(), namespace, name, createFormatProperty(properties), null);
- }
-
- @Override public String getType() {
- return TYPE;
- }
-
- @Override public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
- return (TransitSink) ConfigurableUtil.create(TransitSink.class.getName(), namespace, name, createFormatProperty(properties), null);
- }
-
- /**
- * 创建标准的属性文件
- *
- * @param properties
- * @return
- */
- protected JSONObject createFormatProperty(Properties properties) {
- JSONObject formatProperties = new JSONObject();
- for (Object object : properties.keySet()) {
- String key = (String) object;
- if ("type".equals(key)) {
- continue;
- }
- formatProperties.put(key, properties.getProperty(key));
- }
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "logFingerprintFieldNames", "filterFieldNames");
-
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "logFingerprintFieldNames", "filterfieldnames");
- IChannelBuilder.formatPropertiesName(formatProperties, properties, "tableName", "sql_create_table_name");
-// IChannelBuilder.formatPropertiesName(formatProperties, properties, "maxThread", "thread.max.count");
- return formatProperties;
- }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitSink.java
deleted file mode 100644
index d48526a..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitSink.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.streams.common.channel.impl.transit;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.rocketmq.streams.common.cache.compress.BitSetCache;
-import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
-import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
-import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
-import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
-import org.apache.rocketmq.streams.common.context.Context;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.optimization.MessageGlobleTrace;
-import org.apache.rocketmq.streams.common.optimization.fingerprint.FingerprintCache;
-import org.apache.rocketmq.streams.common.topology.ChainPipeline;
-import org.apache.rocketmq.streams.common.topology.model.Pipeline;
-import org.apache.rocketmq.streams.common.topology.model.PipelineSourceJoiner;
-import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
-
-public class TransitSink extends AbstractSink implements IAfterConfigurableRefreshListener {
- private static final Log LOG = LogFactory.getLog(TransitSink.class);
- protected transient List<ChainPipeline> piplines = new ArrayList<>();
- protected String tableName;
- @ENVDependence
- protected String logFingerprintFieldNames;//config log finger
-
- @Override public boolean batchAdd(IMessage message) {
- boolean onlyOne = piplines.size() == 1;
- int index = 0;
- //可以启动重复过滤,把日志中的必须字段抽取出来,做去重,如果某个日志,在某个pipline执行不成功,下次类似日志过来,直接过滤掉
- BitSetCache.BitSet bitSet = getFilterValue(message);
- if(bitSet==null&&logFingerprintFieldNames!=null){
- bitSet=new BitSetCache.BitSet(piplines.size());
- }
- for (ChainPipeline pipline : piplines) {
- if (bitSet!=null&&bitSet.get(index)) {
- continue;
- }
-
- IMessage copyMessage = message;
- if (!onlyOne) {
- copyMessage = message.deepCopy();
- }
- Context newContext = new Context(copyMessage);
- try {
-
- pipline.doMessage(copyMessage, newContext);
- if (!MessageGlobleTrace.existFinishBranch(copyMessage)) {
- if(bitSet!=null){
- bitSet.set(index);
- addNoFireMessage(message, bitSet);
- }
-
- }
-
- } catch (Exception e) {
- LOG.error("pipline execute error " + pipline.getConfigureName(), e);
- }
- index++;
-
- }
- return true;
- }
-
-
-
- @Override public boolean checkpoint(Set<String> splitIds) {
- return super.checkpoint(splitIds);
- }
-
- @Override public boolean checkpoint(String... splitIds) {
- return super.checkpoint(splitIds);
- }
-
- @Override protected boolean batchInsert(List<IMessage> messages) {
- return false;
- }
-
- @Override protected boolean initConfigurable() {
- boolean success= super.initConfigurable();
- this.messageCache=null;
- return true;
- }
-
- @Override public boolean flush(Set<String> splitIds) {
- return true;
- }
-
- @Override public boolean flush(String... splitIds) {
- return true;
- }
-
- @Override public boolean flush() {
- return true;
- }
-
- @Override public void openAutoFlush() {
- }
-
- @Override public void closeAutoFlush() {
- }
-
- @Override public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
- List<ChainPipeline> piplines = new ArrayList<>();
-
- loadSubPiplines(piplines, configurableService);//通过PiplineSourceJoiner装载子pipline
-
-
- /**
- * 做排序,确保pipline对应的index和messageRepeateFileter 的一致
- */
- Collections.sort(piplines, new Comparator<ChainPipeline>() {
- @Override
- public int compare(ChainPipeline o1, ChainPipeline o2) {
- return o1.getConfigureName().compareTo(o2.getConfigureName());
- }
- });
- if (!equalsPiplines(this.piplines, piplines)) {
- this.piplines = piplines;
-
- }
- }
-
- /**
- * 如果确定这个message,在某个pipline不触发,则记录下来,下次直接跳过,不执行
- *
- * @param message
- * @param message
- */
- protected void addNoFireMessage(IMessage message, BitSetCache.BitSet bitSet) {
- if (this.logFingerprintFieldNames == null) {
- return;
- }
-
- FingerprintCache.getInstance().addLogFingerprint(getOrCreateFingerNameSpace(),message,bitSet,this.logFingerprintFieldNames);
- }
-
-
- /**
- * 判读是否可以针对这条数据,过滤掉这个pipline
- *
- * @param message
- * @return
- */
- protected BitSetCache.BitSet getFilterValue(IMessage message) {
- if (this.logFingerprintFieldNames == null) {
- return null;
- }
-
- return FingerprintCache.getInstance().getLogFingerprint(getOrCreateFingerNameSpace(),message,this.logFingerprintFieldNames);
- }
-
- /**
- * 判读是否可以针对这条数据,过滤掉这个pipline
- *
- * @param msgKey
- * @return
- */
- protected transient String fingerNameSpace;
-
- protected String getOrCreateFingerNameSpace(){
- if(fingerNameSpace==null){
- fingerNameSpace= MapKeyUtil.createKey(this.getNameSpace(),this.getConfigureName());
- }
- return this.fingerNameSpace;
- }
-
- /**
- * 动态装配子pipline
- *
- * @param piplines
- * @param configurableService
- */
- protected void loadSubPiplines(List<ChainPipeline> piplines, IConfigurableService configurableService) {
- List<PipelineSourceJoiner> joiners = configurableService.queryConfigurableByType(PipelineSourceJoiner.TYPE);
- if (joiners == null) {
- return;
- }
- for (PipelineSourceJoiner joiner : joiners) {
- if (tableName.equals(joiner.getSourcePipelineName())) {
- ChainPipeline pipline = configurableService.queryConfigurable(Pipeline.TYPE, joiner.getPipelineName());
- if (pipline != null) {
- piplines.add(pipline);
- }
- }
- }
- }
-
- /**
- * pipline有没有发生过变化
- *
- * @param piplines
- * @param piplines1
- * @return
- */
- private boolean equalsPiplines(List<ChainPipeline> piplines, List<ChainPipeline> piplines1) {
- if (piplines1 == null || piplines1.size() == 0) {
- return false;
- }
- if (piplines == null || piplines.size() == 0) {
- return false;
- }
- if (piplines.size() != piplines1.size()) {
- return false;
- }
- for (ChainPipeline pipline : piplines) {
- if (!piplines1.contains(pipline)) {
- return false;
- }
- }
- return true;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- public String getLogFingerprintFieldNames() {
- return logFingerprintFieldNames;
- }
-
- public void setLogFingerprintFieldNames(String logFingerprintFieldNames) {
- this.logFingerprintFieldNames = logFingerprintFieldNames;
- }
-}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitSource.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitSource.java
deleted file mode 100644
index eeed886..0000000
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/transit/TransitSource.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.streams.common.channel.impl.transit;
-
-import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
-import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
-import org.apache.rocketmq.streams.common.topology.model.PipelineSourceJoiner;
-
-public class TransitSource extends AbstractSource {
- protected String tableName;
-
-
- @Override
- public void addConfigurables(PipelineBuilder pipelineBuilder) {
- PipelineSourceJoiner pipelineSourceJoiner=new PipelineSourceJoiner();
- pipelineSourceJoiner.setSourcePipelineName(tableName);
- pipelineSourceJoiner.setPipelineName(pipelineBuilder.getPipelineName());;
- pipelineBuilder.addConfigurables(pipelineSourceJoiner);
- pipelineBuilder.addConfigurables(this);
- }
-
-
-
- @Override protected boolean startSource() {
- return true;
- }
-
- @Override public boolean supportNewSplitFind() {
- return false;
- }
-
- @Override public boolean supportRemoveSplitFind() {
- return false;
- }
-
- @Override public boolean supportOffsetRest() {
- return false;
- }
-
- @Override protected boolean isNotDataSplit(String queueId) {
- return false;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-}
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/FileConfigureService.java
similarity index 99%
rename from rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
rename to rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/FileConfigureService.java
index 2d54a3e..518ed09 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileConfigureService.java
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/FileConfigureService.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.streams.configurable.service.impl;
+package org.apache.rocketmq.streams.db.configuable;
import java.io.File;
import java.util.ArrayList;
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileSupportParentConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/FileSupportParentConfigureService.java
similarity index 96%
rename from rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileSupportParentConfigureService.java
rename to rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/FileSupportParentConfigureService.java
index 0b77b77..7329a2f 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/FileSupportParentConfigureService.java
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/FileSupportParentConfigureService.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.streams.configurable.service.impl;
+package org.apache.rocketmq.streams.db.configuable;
import com.google.auto.service.AutoService;
import java.util.Properties;
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemoryConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/MemoryConfigureService.java
similarity index 98%
rename from rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemoryConfigureService.java
rename to rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/MemoryConfigureService.java
index 3de2e4d..65a58de 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemoryConfigureService.java
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/MemoryConfigureService.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.streams.configurable.service.impl;
+package org.apache.rocketmq.streams.db.configuable;
import java.util.ArrayList;
import java.util.HashMap;
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemorySupportParentConfigureService.java b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/MemorySupportParentConfigureService.java
similarity index 96%
rename from rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemorySupportParentConfigureService.java
rename to rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/MemorySupportParentConfigureService.java
index 0c8765b..6b777cb 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/impl/MemorySupportParentConfigureService.java
+++ b/rocketmq-streams-db-operator/src/main/java/org/apache/rocketmq/streams/db/configuable/MemorySupportParentConfigureService.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.streams.configurable.service.impl;
+package org.apache.rocketmq.streams.db.configuable;
import com.google.auto.service.AutoService;
import java.util.Properties;
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointExample.java
index 8f81a43..6c931b8 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointExample.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/checkpoint/RemoteCheckpointExample.java
@@ -75,9 +75,9 @@ public class RemoteCheckpointExample {
JSONObject.parseObject((String) message);
} catch (Throwable t) {
// if can not convert to json, discard it.because all operator are base on json.
- return true;
+ return false;
}
- return false;
+ return true;
})
//must convert message to json.
.map(message -> JSONObject.parseObject((String) message))
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java
index 2b823e4..4ab5bca 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java
@@ -71,9 +71,9 @@ public class MultiStreamsExample {
JSONObject.parseObject((String) message);
} catch (Throwable t) {
// if can not convert to json, discard it.because all operator are base on json.
- return true;
+ return false;
}
- return false;
+ return true;
})
//must convert message to json.
.map(message -> JSONObject.parseObject((String) message))