You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/24 10:53:37 UTC

[incubator-tubemq] branch TUBEMQ-455 updated: [TUBEMQ-479] add common channels and messages for agent

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch TUBEMQ-455
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/TUBEMQ-455 by this push:
     new 8716317  [TUBEMQ-479] add common channels and messages for agent
8716317 is described below

commit 87163178da59c8e8683a350f8258d24beed41214
Author: yuanbo <yu...@apache.org>
AuthorDate: Thu Dec 24 16:18:38 2020 +0800

    [TUBEMQ-479] add common channels and messages for agent
---
 .../apache/tubemq/agent/channel/MemoryChannel.java | 80 ++++++++++++++++++++++
 .../apache/tubemq/agent/message/BusMessage.java    | 70 +++++++++++++++++++
 .../tubemq/agent/message/DefaultMessage.java       | 48 +++++++++++++
 .../apache/tubemq/agent/message/EndMessage.java    | 35 ++++++++++
 .../apache/tubemq/agent/{ => core}/AgentMain.java  |  2 +-
 5 files changed, 234 insertions(+), 1 deletion(-)

diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/channel/MemoryChannel.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/channel/MemoryChannel.java
new file mode 100644
index 0000000..87d7a64
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/channel/MemoryChannel.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.channel;
+
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.plugin.Channel;
+import org.apache.tubemq.agent.plugin.Message;
+
+public class MemoryChannel implements Channel {
+
+    private LinkedBlockingQueue<Message> queue;
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void push(Message message) {
+        try {
+            if (message != null) {
+                queue.put(message);
+            }
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public boolean push(Message message, long timeout, TimeUnit unit) {
+        try {
+            if (message != null) {
+                return queue.offer(message, timeout, unit);
+            }
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+        }
+        return false;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Message pull(long timeout, TimeUnit unit) {
+        try {
+            return queue.poll(timeout, unit);
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    @Override
+    public void init(JobProfile jobConf) {
+        queue = new LinkedBlockingQueue<>(
+                jobConf.getInt(
+                    AgentConstants.CHANNEL_MEMORY_CAPACITY, AgentConstants.DEFAULT_CHANNEL_MEMORY_CAPACITY));
+    }
+
+    @Override
+    public void destroy() {
+        if (queue != null) {
+            queue.clear();
+        }
+    }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/BusMessage.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/BusMessage.java
new file mode 100644
index 0000000..989ef81
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/BusMessage.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.message;
+
+import java.util.Map;
+import org.apache.tubemq.agent.plugin.Message;
+
+/**
+ * Bus message with body, header, bid and tid.
+ */
+public class BusMessage implements Message {
+
+    private static final String DEFAULT_TID = "__";
+
+    private final byte[] body;
+    private final Map<String, String> header;
+    private final String bid;
+    private final String tid;
+
+
+    public BusMessage(byte[] body, Map<String, String> header) {
+        this.body = body;
+        this.header = header;
+        this.bid = header.get("bid");
+        this.tid = header.getOrDefault("tid", DEFAULT_TID);
+    }
+
+    /**
+     * Get first line of body list
+     *
+     * @return first line of body list
+     */
+    @Override
+    public byte[] getBody() {
+        return body;
+    }
+
+    /**
+     * Get header of message
+     *
+     * @return header
+     */
+    @Override
+    public Map<String, String> getHeader() {
+        return header;
+    }
+
+    public String getBid() {
+        return bid;
+    }
+
+    public String getTid() {
+        return tid;
+    }
+
+    public static BusMessage parse(Message message) {
+        return new BusMessage(message.getBody(), message.getHeader());
+    }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/DefaultMessage.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/DefaultMessage.java
new file mode 100644
index 0000000..68a3adb
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/DefaultMessage.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.message;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.agent.plugin.Message;
+
+public class DefaultMessage implements Message {
+
+    private final byte[] body;
+    private final Map<String, String> header;
+
+    public DefaultMessage(byte[] body, Map<String, String> header) {
+        this.body = body;
+        this.header = header;
+    }
+
+    public DefaultMessage(byte[] body) {
+        this(body, new HashMap<>());
+    }
+
+    @Override
+    public byte[] getBody() {
+        return body;
+    }
+
+    @Override
+    public Map<String, String> getHeader() {
+        return header;
+    }
+
+    @Override
+    public String toString() {
+        return new String(body);
+    }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/EndMessage.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/EndMessage.java
new file mode 100644
index 0000000..2deb432
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/EndMessage.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.message;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.agent.plugin.Message;
+
+/**
+ * End message, this is an empty message only indicate
+ * that source data have been completely consumed.
+ */
+public class EndMessage implements Message {
+
+    @Override
+    public byte[] getBody() {
+        return null;
+    }
+
+    @Override
+    public Map<String, String> getHeader() {
+        return new HashMap<>();
+    }
+}
diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/AgentMain.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
similarity index 98%
rename from tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/AgentMain.java
rename to tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
index 2b46b0b..119cc0c 100644
--- a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/AgentMain.java
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
@@ -11,7 +11,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tubemq.agent;
+package org.apache.tubemq.agent.core;
 
 import java.util.Iterator;
 import org.apache.commons.cli.CommandLine;