You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/23 07:23:43 UTC

[incubator-tubemq] branch TUBEMQ-455 updated: [TUBEMQ-473] define source/channel/sink interface with design comments

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-455
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-455 by this push:
     new 382ef10  [TUBEMQ-473] define source/channel/sink interface with design comments
382ef10 is described below

commit 382ef10286278b4ff0fd1b3c049e65cbb22ec6a7
Author: yuanbo <yu...@apache.org>
AuthorDate: Wed Dec 23 15:06:49 2020 +0800

    [TUBEMQ-473] define source/channel/sink interface with design comments
---
 .../org/apache/tubemq/agent/conf/JobProfile.java   | 80 ++++++++++++++++++++++
 .../apache/tubemq/agent/conf/TriggerProfile.java   | 34 +++++++++
 .../tubemq/agent/constants/CommonConstants.java    | 74 ++++++++++++++++++++
 .../tubemq/agent/constants/JobConstants.java       | 40 +++++++++++
 .../org/apache/tubemq/agent/plugin/Channel.java    | 45 ++++++++++++
 .../org/apache/tubemq/agent/plugin/Message.java    | 38 ++++++++++
 .../org/apache/tubemq/agent/plugin/Reader.java     | 33 +++++++++
 .../java/org/apache/tubemq/agent/plugin/Sink.java  | 27 ++++++++
 .../org/apache/tubemq/agent/plugin/Source.java     | 31 +++++++++
 .../java/org/apache/tubemq/agent/plugin/Stage.java | 34 +++++++++
 .../org/apache/tubemq/agent/plugin/Trigger.java    | 46 +++++++++++++
 11 files changed, 482 insertions(+)

diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/JobProfile.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/JobProfile.java
new file mode 100644
index 0000000..b14c495
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/JobProfile.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.conf;
+
+import static org.apache.tubemq.agent.constants.JobConstants.JOB_CHANNEL;
+import static org.apache.tubemq.agent.constants.JobConstants.JOB_ID;
+import static org.apache.tubemq.agent.constants.JobConstants.JOB_NAME;
+import static org.apache.tubemq.agent.constants.JobConstants.JOB_SINK;
+import static org.apache.tubemq.agent.constants.JobConstants.JOB_SOURCE;
+
+import com.google.gson.Gson;
+
+/**
+ * job profile which contains details describing properties of one job.
+ *
+ */
+public class JobProfile extends Configuration {
+
+    private final Gson gson = new Gson();
+
+    /**
+     * parse json string to configuration instance。
+     *
+     * @param jsonStr
+     * @return job configuration
+     */
+    public static JobProfile parseJsonStr(String jsonStr) {
+        JobProfile conf = new JobProfile();
+        conf.loadJsonStrResource(jsonStr);
+        return conf;
+    }
+
+    /**
+     * parse properties file
+     *
+     * @param fileName - file name.
+     * @return jobConfiguration.
+     */
+    public static JobProfile parsePropertiesFile(String fileName) {
+        JobProfile conf = new JobProfile();
+        conf.loadPropertiesResource(fileName);
+        return conf;
+    }
+
+    /**
+     * parse json file.
+     * @param fileName - json file name.
+     * @return jobConfiguration.
+     */
+    public static JobProfile parseJsonFile(String fileName) {
+        JobProfile conf = new JobProfile();
+        conf.loadJsonResource(fileName);
+        return conf;
+    }
+
+    /**
+     * check whether required keys exists.
+     *
+     * @return return true if all required keys exists else false.
+     */
+    public boolean allRequiredKeyExist() {
+        return hasKey(JOB_ID) && hasKey(JOB_SOURCE)
+            && hasKey(JOB_SINK) && hasKey(JOB_CHANNEL) && hasKey(JOB_NAME);
+    }
+
+    public String toJsonStr() {
+        return gson.toJson(getConfigStorage());
+    }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/TriggerProfile.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/TriggerProfile.java
new file mode 100644
index 0000000..d84ca15
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/conf/TriggerProfile.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.conf;
+
+import org.apache.tubemq.agent.constants.JobConstants;
+
+/**
+ * profile used in trigger. Trigger profile is a special job profile with
+ * trigger config.
+ */
+public class TriggerProfile extends JobProfile {
+
+    @Override
+    public boolean allRequiredKeyExist() {
+        return hasKey(JobConstants.JOB_TRIGGER) && hasKey(JobConstants.JOB_ID);
+    }
+
+    public static TriggerProfile parseJsonStr(String jsonStr) {
+        TriggerProfile conf = new TriggerProfile();
+        conf.loadJsonStrResource(jsonStr);
+        return conf;
+    }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/CommonConstants.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/CommonConstants.java
new file mode 100644
index 0000000..03f6510
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/CommonConstants.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.constants;
+
+import org.apache.tubemq.agent.utils.AgentUtils;
+
+public class CommonConstants {
+    public static final String BUS_TDMANAGER_HOST = "bus.tdmanager.host";
+    public static final String BUS_TDMANAGER_PORT = "bus.tdmanager.port";
+
+    public static final String BUS_NET_TAG = "bus.net.tag";
+    public static final String DEFAULT_BUS_NET_TAG = "";
+
+    public static final String BUS_BID = "bus.bid";
+
+    public static final String BUS_LOCALHOST = "bus.localHost";
+    public static final String DEFAULT_BUS_LOCALHOST = AgentUtils.getLocalIP();
+
+    public static final String BUS_IS_LOCAL_VISIT = "bus.isLocalVisit";
+    public static final boolean DEFAULT_BUS_IS_LOCAL_VISIT = true;
+
+    public static final String BUS_TOTAL_ASYNC_BUF_SIZE = "bus.total.async.bus.size";
+    public static final int DEFAULT_BUS_TOTAL_ASYNC_BUF_SIZE = 200 * 1024 * 1024;
+
+    public static final String BUS_ALIVE_CONNECTION_NUM = "bus.alive.connection.num";
+    public static final int DEFAULT_BUS_ALIVE_CONNECTION_NUM = 10;
+
+    public static final String BUS_MSG_TYPE = "bus.msgType";
+    public static final int DEFAULT_BUS_MSG_TYPE = 7;
+
+    public static final String BUS_IS_COMPRESS = "bus.is.compress";
+    public static final boolean DEFAULT_BUS_IS_COMPRESS = true;
+
+    public static final String BUS_MAX_SENDER_PER_BID = "bus.max.sender.per.pid";
+    public static final int DEFAULT_BUS_MAX_SENDER_PER_PID = 10;
+
+    // max size of message list
+    public static final String BUS_PACKAGE_MAX_SIZE = "bus.package.maxSize";
+    // max size of single batch in bytes, default is 200KB.
+    public static final int DEFAULT_BUS_PACKAGE_MAX_SIZE = 200000;
+
+    public static final String BUS_TID_QUEUE_MAX_NUMBER = "bus.tid.queue.maxNumber";
+    public static final int DEFAULT_BUS_TID_QUEUE_MAX_NUMBER = 10000;
+
+    public static final String BUS_PACKAGE_MAX_TIMEOUT_MS = "bus.package.maxTimeout.ms";
+    public static final int DEFAULT_BUS_PACKAGE_MAX_TIMEOUT_MS = 4 * 1000;
+
+    public static final String BUS_BATCH_FLUSH_INTERVAL = "bus.batch.flush.interval";
+    public static final int DEFAULT_BUS_BATCH_FLUSH_INTERVAL = 2 * 1000;
+
+    public static final String BUS_SENDER_MAX_TIMEOUT = "bus.sender.maxTimeout";
+    // max timeout in seconds.
+    public static final int DEFAULT_BUS_SENDER_MAX_TIMEOUT = 20;
+
+    public static final String BUS_SENDER_MAX_RETRY = "bus.sender.maxRetry";
+    public static final int DEFAULT_BUS_SENDER_MAX_RETRY = 5;
+
+    public static final String BUS_IS_FILE = "bus.isFile";
+    public static final boolean DEFAULT_IS_FILE = true;
+
+    public static final String BUS_RETRY_SLEEP = "bus.retry.sleep";
+    public static final long DEFAULT_BUS_RETRY_SLEEP = 500;
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/JobConstants.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/JobConstants.java
new file mode 100644
index 0000000..a4bec9d
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/constants/JobConstants.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.constants;
+
+
+/**
+ * Basic config for a single job
+ */
+public class JobConstants extends CommonConstants {
+
+    // job id
+    public static final String JOB_ID = "job.id";
+
+    public static final String JOB_SOURCE = "job.source";
+    public static final String JOB_SINK = "job.sink";
+    public static final String JOB_CHANNEL = "job.channel";
+    public static final String JOB_TRIGGER = "job.trigger";
+    public static final String JOB_NAME = "job.name";
+    public static final String DEFAULT_JOB_NAME = "default";
+    public static final String JOB_DESCRIPTION = "job.description";
+    public static final String DEFAULT_JOB_DESCRIPTION = "default job description";
+
+    // job type, delete/add
+    public static final String JOB_TYPE = "job.type";
+
+    public static final String JOB_CHECKPOINT = "job.checkpoint";
+
+    public static final String JOB_PATH_PATTERN = "job.path.pattern";
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Channel.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Channel.java
new file mode 100644
index 0000000..243f5a9
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Channel.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Channel is used as data buffer between source and sink.
+ */
+public interface Channel extends Stage {
+
+
+    /**
+     * write message
+     *
+     * @param message - message
+     */
+    void push(Message message);
+
+    /**
+     * write message with timeout
+     *
+     * @param message - message
+     */
+    boolean push(Message message, long timeout, TimeUnit unit);
+
+    /**
+     * read message with timeout
+     *
+     * @return - message
+     */
+    Message pull(long timeout, TimeUnit unit);
+
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Message.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Message.java
new file mode 100644
index 0000000..3cfdcf0
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Message.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+import java.util.Map;
+
+/**
+ * Message used in inner-data transfer, message is divided into
+ * two parts, header and body. header is the attributes of message
+ * and body is the content of message.
+ */
+public interface Message {
+
+    /**
+     * Data content of message.
+     *
+     * @return bytes body
+     */
+    byte[] getBody();
+
+    /**
+     * Data attribute of message
+     *
+     * @return map header
+     */
+    Map<String, String> getHeader();
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Reader.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Reader.java
new file mode 100644
index 0000000..00f1c44
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Reader.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+/**
+ * Reader reads data and provides condition whether the reading action is finished. It's called at
+ * Task level.
+ */
+public interface Reader extends Stage {
+
+    /**
+     * Read message
+     *
+     * @return - message
+     */
+    Message read();
+
+    /**
+     * Whether finish reading
+     */
+    boolean isFinished();
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Sink.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Sink.java
new file mode 100644
index 0000000..5239e40
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Sink.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+/**
+ * Sink data to remote data center
+ */
+public interface Sink extends Stage {
+
+    /**
+     * Write data into data center
+     *
+     * @param message - message
+     */
+    void write(Message message);
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Source.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Source.java
new file mode 100644
index 0000000..2f1e94c
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Source.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+import java.util.List;
+import org.apache.tubemq.agent.conf.JobProfile;
+
+/**
+ * Source can be split into multiple reader.
+ */
+public interface Source {
+
+    /**
+     * Split source into a list of readers.
+     *
+     * @param conf job conf
+     * @return - list of reader
+     */
+    List<Reader> split(JobProfile conf);
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Stage.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Stage.java
new file mode 100644
index 0000000..278265d
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Stage.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+import org.apache.tubemq.agent.conf.JobProfile;
+
+/**
+ * Stage definition.
+ */
+public interface Stage {
+
+    /**
+     * Init job.
+     *
+     * @param jobConf - job config
+     */
+    void init(JobProfile jobConf);
+
+    /**
+     * Destroy job.
+     */
+    void destroy();
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Trigger.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Trigger.java
new file mode 100644
index 0000000..5765fd7
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/plugin/Trigger.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.plugin;
+
+import java.io.IOException;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.conf.TriggerProfile;
+
+/**
+ * Trigger interface, which generates job in condition.
+ */
+public interface Trigger {
+
+    /**
+     * init trigger by trigger profile
+     * @param profile - trigger profile.
+     */
+    void init(TriggerProfile profile) throws IOException;
+
+    /**
+     * run trigger.
+     */
+    void run();
+
+    /**
+     * destroy trigger.
+     */
+    void destroy();
+
+    /**
+     * fetch job profile from trigger
+     * @return - job profile
+     */
+    JobProfile fetchJobProfile();
+}