You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/12/24 10:53:37 UTC
[incubator-tubemq] branch TUBEMQ-455 updated: [TUBEMQ-479] add
common channels and messages for agent
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch TUBEMQ-455
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-455 by this push:
new 8716317 [TUBEMQ-479] add common channels and messages for agent
8716317 is described below
commit 87163178da59c8e8683a350f8258d24beed41214
Author: yuanbo <yu...@apache.org>
AuthorDate: Thu Dec 24 16:18:38 2020 +0800
[TUBEMQ-479] add common channels and messages for agent
---
.../apache/tubemq/agent/channel/MemoryChannel.java | 80 ++++++++++++++++++++++
.../apache/tubemq/agent/message/BusMessage.java | 70 +++++++++++++++++++
.../tubemq/agent/message/DefaultMessage.java | 48 +++++++++++++
.../apache/tubemq/agent/message/EndMessage.java | 35 ++++++++++
.../apache/tubemq/agent/{ => core}/AgentMain.java | 2 +-
5 files changed, 234 insertions(+), 1 deletion(-)
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/channel/MemoryChannel.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/channel/MemoryChannel.java
new file mode 100644
index 0000000..87d7a64
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/channel/MemoryChannel.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.channel;
+
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.tubemq.agent.conf.JobProfile;
+import org.apache.tubemq.agent.constants.AgentConstants;
+import org.apache.tubemq.agent.plugin.Channel;
+import org.apache.tubemq.agent.plugin.Message;
+
+public class MemoryChannel implements Channel {
+
+ private LinkedBlockingQueue<Message> queue;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void push(Message message) {
+ try {
+ if (message != null) {
+ queue.put(message);
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public boolean push(Message message, long timeout, TimeUnit unit) {
+ try {
+ if (message != null) {
+ return queue.offer(message, timeout, unit);
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Message pull(long timeout, TimeUnit unit) {
+ try {
+ return queue.poll(timeout, unit);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ @Override
+ public void init(JobProfile jobConf) {
+ queue = new LinkedBlockingQueue<>(
+ jobConf.getInt(
+ AgentConstants.CHANNEL_MEMORY_CAPACITY, AgentConstants.DEFAULT_CHANNEL_MEMORY_CAPACITY));
+ }
+
+ @Override
+ public void destroy() {
+ if (queue != null) {
+ queue.clear();
+ }
+ }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/BusMessage.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/BusMessage.java
new file mode 100644
index 0000000..989ef81
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/BusMessage.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.message;
+
+import java.util.Map;
+import org.apache.tubemq.agent.plugin.Message;
+
+/**
+ * Bus message with body, header, bid and tid.
+ */
+public class BusMessage implements Message {
+
+ private static final String DEFAULT_TID = "__";
+
+ private final byte[] body;
+ private final Map<String, String> header;
+ private final String bid;
+ private final String tid;
+
+
+ public BusMessage(byte[] body, Map<String, String> header) {
+ this.body = body;
+ this.header = header;
+ this.bid = header.get("bid");
+ this.tid = header.getOrDefault("tid", DEFAULT_TID);
+ }
+
+ /**
+ * Get first line of body list
+ *
+ * @return first line of body list
+ */
+ @Override
+ public byte[] getBody() {
+ return body;
+ }
+
+ /**
+ * Get header of message
+ *
+ * @return header
+ */
+ @Override
+ public Map<String, String> getHeader() {
+ return header;
+ }
+
+ public String getBid() {
+ return bid;
+ }
+
+ public String getTid() {
+ return tid;
+ }
+
+ public static BusMessage parse(Message message) {
+ return new BusMessage(message.getBody(), message.getHeader());
+ }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/DefaultMessage.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/DefaultMessage.java
new file mode 100644
index 0000000..68a3adb
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/DefaultMessage.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.message;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.agent.plugin.Message;
+
+public class DefaultMessage implements Message {
+
+ private final byte[] body;
+ private final Map<String, String> header;
+
+ public DefaultMessage(byte[] body, Map<String, String> header) {
+ this.body = body;
+ this.header = header;
+ }
+
+ public DefaultMessage(byte[] body) {
+ this(body, new HashMap<>());
+ }
+
+ @Override
+ public byte[] getBody() {
+ return body;
+ }
+
+ @Override
+ public Map<String, String> getHeader() {
+ return header;
+ }
+
+ @Override
+ public String toString() {
+ return new String(body);
+ }
+}
diff --git a/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/EndMessage.java b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/EndMessage.java
new file mode 100644
index 0000000..2deb432
--- /dev/null
+++ b/tubemq-agent/agent-common/src/main/java/org/apache/tubemq/agent/message/EndMessage.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tubemq.agent.message;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.tubemq.agent.plugin.Message;
+
+/**
+ * End message, this is an empty message only indicate
+ * that source data have been completely consumed.
+ */
+public class EndMessage implements Message {
+
+ @Override
+ public byte[] getBody() {
+ return null;
+ }
+
+ @Override
+ public Map<String, String> getHeader() {
+ return new HashMap<>();
+ }
+}
diff --git a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/AgentMain.java b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
similarity index 98%
rename from tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/AgentMain.java
rename to tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
index 2b46b0b..119cc0c 100644
--- a/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/AgentMain.java
+++ b/tubemq-agent/agent-core/src/main/java/org/apache/tubemq/agent/core/AgentMain.java
@@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tubemq.agent;
+package org.apache.tubemq.agent.core;
import java.util.Iterator;
import org.apache.commons.cli.CommandLine;