You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/23 07:23:43 UTC
[incubator-tubemq] branch TUBEMQ-455 updated: [TUBEMQ-473] define
source/channel/sink interface with design comments
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-455
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-455 by this push:
new 382ef10 [TUBEMQ-473] define source/channel/sink interface with design comments
382ef10 is described below
commit 382ef10286278b4ff0fd1b3c049e65cbb22ec6a7
Author: yuanbo <yu...@apache.org>
AuthorDate: Wed Dec 23 15:06:49 2020 +0800
[TUBEMQ-473] define source/channel/sink interface with design comments
---
.../org/apache/tubemq/agent/conf/JobProfile.java | 80 ++++++++++++++++++++++
.../apache/tubemq/agent/conf/TriggerProfile.java | 34 +++++++++
.../tubemq/agent/constants/CommonConstants.java | 74 ++++++++++++++++++++
.../tubemq/agent/constants/JobConstants.java | 40 +++++++++++
.../org/apache/tubemq/agent/plugin/Channel.java | 45 ++++++++++++
.../org/apache/tubemq/agent/plugin/Message.java | 38 ++++++++++
.../org/apache/tubemq/agent/plugin/Reader.java | 33 +++++++++
.../java/org/apache/tubemq/agent/plugin/Sink.java | 27 ++++++++
.../org/apache/tubemq/agent/plugin/Source.java | 31 +++++++++
.../java/org/apache/tubemq/agent/plugin/Stage.java | 34 +++++++++
.../org/apache/tubemq/agent/plugin/Trigger.java | 46 +++++++++++++
11 files changed, 482 insertions(+)
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/JobProfile.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/JobProfile.java
new file mode 100644
index 0000000..b14c495
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/JobProfile.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.conf;
+
+import static org.apache.tubemq.agent.constants.JobConstants.JOB_CHANNEL;
+import static org.apache.tubemq.agent.constants.JobConstants.JOB_ID;
+import static org.apache.tubemq.agent.constants.JobConstants.JOB_NAME;
+import static org.apache.tubemq.agent.constants.JobConstants.JOB_SINK;
+import static org.apache.tubemq.agent.constants.JobConstants.JOB_SOURCE;
+
+import com.google.gson.Gson;
+
+/**
+ * job profile which contains details describing properties of one job.
+ *
+ */
+public class JobProfile extends Configuration {
+
+ private final Gson gson = new Gson();
+
+ /**
+ * parse json string to configuration instance。
+ *
+ * @param jsonStr
+ * @return job configuration
+ */
+ public static JobProfile parseJsonStr(String jsonStr) {
+ JobProfile conf = new JobProfile();
+ conf.loadJsonStrResource(jsonStr);
+ return conf;
+ }
+
+ /**
+ * parse properties file
+ *
+ * @param fileName - file name.
+ * @return jobConfiguration.
+ */
+ public static JobProfile parsePropertiesFile(String fileName) {
+ JobProfile conf = new JobProfile();
+ conf.loadPropertiesResource(fileName);
+ return conf;
+ }
+
+ /**
+ * parse json file.
+ * @param fileName - json file name.
+ * @return jobConfiguration.
+ */
+ public static JobProfile parseJsonFile(String fileName) {
+ JobProfile conf = new JobProfile();
+ conf.loadJsonResource(fileName);
+ return conf;
+ }
+
+ /**
+ * check whether required keys exists.
+ *
+ * @return return true if all required keys exists else false.
+ */
+ public boolean allRequiredKeyExist() {
+ return hasKey(JOB_ID) && hasKey(JOB_SOURCE)
+ && hasKey(JOB_SINK) && hasKey(JOB_CHANNEL) && hasKey(JOB_NAME);
+ }
+
+ public String toJsonStr() {
+ return gson.toJson(getConfigStorage());
+ }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/TriggerProfile.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/TriggerProfile.java
new file mode 100644
index 0000000..d84ca15
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/TriggerProfile.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.conf;
+
+import org.apache.tubemq.agent.constants.JobConstants;
+
+/**
+ * profile used in trigger. Trigger profile is a special job profile with
+ * trigger config.
+ */
+public class TriggerProfile extends JobProfile {
+
+ @Override
+ public boolean allRequiredKeyExist() {
+ return hasKey(JobConstants.JOB_TRIGGER) && hasKey(JobConstants.JOB_ID);
+ }
+
+ public static TriggerProfile parseJsonStr(String jsonStr) {
+ TriggerProfile conf = new TriggerProfile();
+ conf.loadJsonStrResource(jsonStr);
+ return conf;
+ }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/CommonConstants.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/CommonConstants.java
new file mode 100644
index 0000000..03f6510
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/CommonConstants.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.constants;
+
+import org.apache.tubemq.agent.utils.AgentUtils;
+
+public class CommonConstants {
+ public static final String BUS_TDMANAGER_HOST = "bus.tdmanager.host";
+ public static final String BUS_TDMANAGER_PORT = "bus.tdmanager.port";
+
+ public static final String BUS_NET_TAG = "bus.net.tag";
+ public static final String DEFAULT_BUS_NET_TAG = "";
+
+ public static final String BUS_BID = "bus.bid";
+
+ public static final String BUS_LOCALHOST = "bus.localHost";
+ public static final String DEFAULT_BUS_LOCALHOST = AgentUtils.getLocalIP();
+
+ public static final String BUS_IS_LOCAL_VISIT = "bus.isLocalVisit";
+ public static final boolean DEFAULT_BUS_IS_LOCAL_VISIT = true;
+
+ public static final String BUS_TOTAL_ASYNC_BUF_SIZE = "bus.total.async.bus.size";
+ public static final int DEFAULT_BUS_TOTAL_ASYNC_BUF_SIZE = 200 * 1024 * 1024;
+
+ public static final String BUS_ALIVE_CONNECTION_NUM = "bus.alive.connection.num";
+ public static final int DEFAULT_BUS_ALIVE_CONNECTION_NUM = 10;
+
+ public static final String BUS_MSG_TYPE = "bus.msgType";
+ public static final int DEFAULT_BUS_MSG_TYPE = 7;
+
+ public static final String BUS_IS_COMPRESS = "bus.is.compress";
+ public static final boolean DEFAULT_BUS_IS_COMPRESS = true;
+
+ public static final String BUS_MAX_SENDER_PER_BID = "bus.max.sender.per.pid";
+ public static final int DEFAULT_BUS_MAX_SENDER_PER_PID = 10;
+
+ // max size of message list
+ public static final String BUS_PACKAGE_MAX_SIZE = "bus.package.maxSize";
+ // max size of single batch in bytes, default is 200KB.
+ public static final int DEFAULT_BUS_PACKAGE_MAX_SIZE = 200000;
+
+ public static final String BUS_TID_QUEUE_MAX_NUMBER = "bus.tid.queue.maxNumber";
+ public static final int DEFAULT_BUS_TID_QUEUE_MAX_NUMBER = 10000;
+
+ public static final String BUS_PACKAGE_MAX_TIMEOUT_MS = "bus.package.maxTimeout.ms";
+ public static final int DEFAULT_BUS_PACKAGE_MAX_TIMEOUT_MS = 4 * 1000;
+
+ public static final String BUS_BATCH_FLUSH_INTERVAL = "bus.batch.flush.interval";
+ public static final int DEFAULT_BUS_BATCH_FLUSH_INTERVAL = 2 * 1000;
+
+ public static final String BUS_SENDER_MAX_TIMEOUT = "bus.sender.maxTimeout";
+ // max timeout in seconds.
+ public static final int DEFAULT_BUS_SENDER_MAX_TIMEOUT = 20;
+
+ public static final String BUS_SENDER_MAX_RETRY = "bus.sender.maxRetry";
+ public static final int DEFAULT_BUS_SENDER_MAX_RETRY = 5;
+
+ public static final String BUS_IS_FILE = "bus.isFile";
+ public static final boolean DEFAULT_IS_FILE = true;
+
+ public static final String BUS_RETRY_SLEEP = "bus.retry.sleep";
+ public static final long DEFAULT_BUS_RETRY_SLEEP = 500;
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/JobConstants.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/JobConstants.java
new file mode 100644
index 0000000..a4bec9d
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/JobConstants.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.constants;
+
+
+/**
+ * Basic config for a single job
+ */
+public class JobConstants extends CommonConstants {
+
+ // job id
+ public static final String JOB_ID = "job.id";
+
+ public static final String JOB_SOURCE = "job.source";
+ public static final String JOB_SINK = "job.sink";
+ public static final String JOB_CHANNEL = "job.channel";
+ public static final String JOB_TRIGGER = "job.trigger";
+ public static final String JOB_NAME = "job.name";
+ public static final String DEFAULT_JOB_NAME = "default";
+ public static final String JOB_DESCRIPTION = "job.description";
+ public static final String DEFAULT_JOB_DESCRIPTION = "default job description";
+
+ // job type, delete/add
+ public static final String JOB_TYPE = "job.type";
+
+ public static final String JOB_CHECKPOINT = "job.checkpoint";
+
+ public static final String JOB_PATH_PATTERN = "job.path.pattern";
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Channel.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Channel.java
new file mode 100644
index 0000000..243f5a9
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Channel.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Channel is used as data buffer between source and sink.
+ */
+public interface Channel extends Stage {
+
+
+ /**
+ * write message
+ *
+ * @param message - message
+ */
+ void push(Message message);
+
+ /**
+ * write message with timeout
+ *
+ * @param message - message
+ */
+ boolean push(Message message, long timeout, TimeUnit unit);
+
+ /**
+ * read message with timeout
+ *
+ * @return - message
+ */
+ Message pull(long timeout, TimeUnit unit);
+
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Message.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Message.java
new file mode 100644
index 0000000..3cfdcf0
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Message.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+import java.util.Map;
+
+/**
+ * Message used in inner-data transfer, message is divided into
+ * two parts, header and body. header is the attributes of message
+ * and body is the content of message.
+ */
+public interface Message {
+
+ /**
+ * Data content of message.
+ *
+ * @return bytes body
+ */
+ byte[] getBody();
+
+ /**
+ * Data attribute of message
+ *
+ * @return map header
+ */
+ Map<String, String> getHeader();
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Reader.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Reader.java
new file mode 100644
index 0000000..00f1c44
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Reader.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+/**
+ * Reader reads data and provides condition whether the reading action is finished. It's called at
+ * Task level.
+ */
+public interface Reader extends Stage {
+
+ /**
+ * Read message
+ *
+ * @return - message
+ */
+ Message read();
+
+ /**
+ * Whether finish reading
+ */
+ boolean isFinished();
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Sink.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Sink.java
new file mode 100644
index 0000000..5239e40
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Sink.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+/**
+ * Sink data to remote data center
+ */
+public interface Sink extends Stage {
+
+ /**
+ * Write data into data center
+ *
+ * @param message - message
+ */
+ void write(Message message);
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Source.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Source.java
new file mode 100644
index 0000000..2f1e94c
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Source.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+import java.util.List;
+import org.apache.tubemq.agent.conf.JobProfile;
+
+/**
+ * Source can be split into multiple reader.
+ */
+public interface Source {
+
+ /**
+ * Split source into a list of readers.
+ *
+ * @param conf job conf
+ * @return - list of reader
+ */
+ List<Reader> split(JobProfile conf);
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Stage.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Stage.java
new file mode 100644
index 0000000..278265d
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Stage.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+import org.apache.tubemq.agent.conf.JobProfile;
+
+/**
+ * Stage definition.
+ */
+public interface Stage {
+
+ /**
+ * Init job.
+ *
+ * @param jobConf - job config
+ */
+ void init(JobProfile jobConf);
+
+ /**
+ * Destroy job.
+ */
+ void destroy();
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Trigger.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Trigger.java
new file mode 100644
index 0000000..5765fd7
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Trigger.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+import java.io.IOException;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.conf.TriggerProfile;
+
+/**
+ * Trigger interface, which generates job in condition.
+ */
+public interface Trigger {
+
+ /**
+ * init trigger by trigger profile
+ * @param profile - trigger profile.
+ */
+ void init(TriggerProfile profile) throws IOException;
+
+ /**
+ * run trigger.
+ */
+ void run();
+
+ /**
+ * destroy trigger.
+ */
+ void destroy();
+
+ /**
+ * fetch job profile from trigger
+ * @return - job profile
+ */
+ JobProfile fetchJobProfile();
+}