You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/08/30 03:21:23 UTC

[rocketmq-ons] 23/29: (1) Polish existed ons api (2) Add pull consumer Implementation

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch pullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git

commit f14be542b1f9f02c225cdb179c97b1b517450d44
Author: duhenglucky <du...@gmail.com>
AuthorDate: Wed Jul 17 13:55:00 2019 +0800

    (1) Polish existed ons api (2) Add pull consumer Implementation
---
 .../java/org/apache/rocketmq/ons/api/Admin.java    | 18 +---------------
 .../java/org/apache/rocketmq/ons/api/Consumer.java |  5 +----
 .../ons/api/{Admin.java => Credentials.java}       | 17 +--------------
 .../ons/api/{Admin.java => LifeCycle.java}         | 15 ++------------
 ...alTransactionChecker.java => MessageQueue.java} |  9 ++------
 .../java/org/apache/rocketmq/ons/api/Producer.java | 14 ++++++-------
 .../ons/api/{Admin.java => PullConsumer.java}      | 24 +++++++++++++++-------
 .../rocketmq/ons/api/batch/BatchConsumer.java      |  5 +++--
 .../apache/rocketmq/ons/api/bean/ProducerBean.java | 11 ++++++++++
 .../ons/api/bean/TransactionProducerBean.java      |  4 ++--
 .../rocketmq/ons/api/order/OrderConsumer.java      |  6 +++---
 .../rocketmq/ons/api/order/OrderProducer.java      | 11 +++++++---
 .../api/transaction/LocalTransactionChecker.java   |  2 ++
 ...Executer.java => LocalTransactionExecutor.java} |  4 ++--
 .../ons/api/transaction/TransactionProducer.java   | 16 ++++-----------
 .../ons/api/impl/rocketmq/ProducerImpl.java        |  9 ++++++++
 .../api/impl/rocketmq/TransactionProducerImpl.java |  4 ++--
 .../sample/producer/SimpleTransactionProducer.java |  5 +++--
 18 files changed, 79 insertions(+), 100 deletions(-)

diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
index 702d9a5..46f1e79 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
@@ -16,22 +16,6 @@
  */
 package org.apache.rocketmq.ons.api;
 
-import java.util.Properties;
+public interface Admin extends LifeCycle, Credentials {
 
-
-public interface Admin {
-
-    boolean isStarted();
-
-
-    boolean isClosed();
-
-
-    void start();
-
-
-    void updateCredential(Properties credentialProperties);
-
-
-    void shutdown();
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java
index a592559..03d8f7c 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java
@@ -16,14 +16,11 @@
  */
 package org.apache.rocketmq.ons.api;
 
-
-public interface Consumer extends Admin {
+public interface Consumer extends LifeCycle, Credentials {
 
     void subscribe(final String topic, final String subExpression, final MessageListener listener);
 
-
     void subscribe(final String topic, final MessageSelector selector, final MessageListener listener);
 
-
     void unsubscribe(final String topic);
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Credentials.java
similarity index 88%
copy from ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
copy to ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Credentials.java
index 702d9a5..08a90c8 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Credentials.java
@@ -15,23 +15,8 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.ons.api;
-
 import java.util.Properties;
 
-
-public interface Admin {
-
-    boolean isStarted();
-
-
-    boolean isClosed();
-
-
-    void start();
-
-
+public interface Credentials {
     void updateCredential(Properties credentialProperties);
-
-
-    void shutdown();
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
similarity index 84%
copy from ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
copy to ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
index 702d9a5..51d96c4 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
@@ -1,4 +1,4 @@
-/*
+package org.apache.rocketmq.ons.api;/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,24 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.ons.api;
-
-import java.util.Properties;
-
-
-public interface Admin {
 
+public interface LifeCycle {
     boolean isStarted();
 
-
     boolean isClosed();
 
-
     void start();
 
-
-    void updateCredential(Properties credentialProperties);
-
-
     void shutdown();
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java
similarity index 81%
copy from ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java
copy to ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java
index eb46593..00f8b2d 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java
@@ -14,11 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.ons.api.transaction;
-
-import org.apache.rocketmq.ons.api.Message;
-
-public interface LocalTransactionChecker {
-
-    TransactionStatus check(final Message msg);
+package org.apache.rocketmq.ons.api;
+public class MessageQueue {
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java
index e45e613..3bd1a0d 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java
@@ -16,16 +16,10 @@
  */
 package org.apache.rocketmq.ons.api;
 
+import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 
-public interface Producer extends Admin {
-
-
-    @Override
-    void start();
-
-    @Override
-    void shutdown();
+public interface Producer extends LifeCycle, Credentials {
 
     SendResult send(final Message message);
 
@@ -34,4 +28,8 @@ public interface Producer extends Admin {
     void sendAsync(final Message message, final SendCallback sendCallback);
 
     void setCallbackExecutor(final ExecutorService callbackExecutor);
+
+    SendResult send(final Message message, final String shardingKey);
+
+    SendResult send(final Collection<Message> messages);
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
similarity index 54%
copy from ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
copy to ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
index 702d9a5..abdd966 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
@@ -16,22 +16,32 @@
  */
 package org.apache.rocketmq.ons.api;
 
-import java.util.Properties;
+import java.util.Collection;
+import java.util.List;
 
+public interface PullConsumer {
 
-public interface Admin {
+    void subscribe(final Collection<String> topics);
 
-    boolean isStarted();
+    void subscribe(final Collection<String> topics, final String subExpression);
 
+    void unsubscribe(final Collection<String> topics);
 
-    boolean isClosed();
+    List<Message> poll(long timeout);
 
+    void seek(MessageQueue messageQueue, long offset);
 
-    void start();
+    void seekToBeginning(MessageQueue messageQueue);
 
+    void seekToEnd(MessageQueue messageQueuea);
 
-    void updateCredential(Properties credentialProperties);
+    void pause(Collection<MessageQueue> messageQueues);
 
+    void resume(Collection<MessageQueue> partitions);
 
-    void shutdown();
+    Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp);
+
+    Long committed(MessageQueue messageQueue);
+
+    void commitSync();
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java
index 848a139..d5887bc 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java
@@ -17,9 +17,10 @@
 
 package org.apache.rocketmq.ons.api.batch;
 
-import org.apache.rocketmq.ons.api.Admin;
+import org.apache.rocketmq.ons.api.Credentials;
+import org.apache.rocketmq.ons.api.LifeCycle;
 
-public interface BatchConsumer extends Admin {
+public interface BatchConsumer extends LifeCycle, Credentials {
 
     void subscribe(final String topic, final String subExpression, final BatchMessageListener listener);
 
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
index 2094158..548cc82 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.ons.api.bean;
 
+import java.util.Collection;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import org.apache.rocketmq.ons.api.Message;
@@ -95,4 +96,14 @@ public class ProducerBean implements Producer {
     public boolean isClosed() {
         return this.producer.isClosed();
     }
+
+    @Override public SendResult send(Message message, String shardingKey) {
+        //TODO
+        return null;
+    }
+
+    @Override public SendResult send(Collection<Message> messages) {
+        //TODO
+        return null;
+    }
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java
index cb55e14..ddf513c 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.ons.api.ONSFactory;
 import org.apache.rocketmq.ons.api.SendResult;
 import org.apache.rocketmq.ons.api.exception.ONSClientException;
 import org.apache.rocketmq.ons.api.transaction.LocalTransactionChecker;
-import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecuter;
+import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecutor;
 import org.apache.rocketmq.ons.api.transaction.TransactionProducer;
 
 
@@ -63,7 +63,7 @@ public class TransactionProducerBean implements TransactionProducer {
     }
 
     @Override
-    public SendResult send(Message message, LocalTransactionExecuter executer, Object arg) {
+    public SendResult send(Message message, LocalTransactionExecutor executer, Object arg) {
         return this.transactionProducer.send(message, executer, arg);
     }
 
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java
index 4e73c1b..09b9127 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java
@@ -17,11 +17,11 @@
 
 package org.apache.rocketmq.ons.api.order;
 
-import org.apache.rocketmq.ons.api.Admin;
+import org.apache.rocketmq.ons.api.Credentials;
+import org.apache.rocketmq.ons.api.LifeCycle;
 import org.apache.rocketmq.ons.api.MessageSelector;
 
-
-public interface OrderConsumer extends Admin {
+public interface OrderConsumer extends LifeCycle, Credentials {
 
     @Override
     void start();
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java
index 4e9a0bb..25df1d0 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java
@@ -17,12 +17,17 @@
 
 package org.apache.rocketmq.ons.api.order;
 
-import org.apache.rocketmq.ons.api.Admin;
+import org.apache.rocketmq.ons.api.Credentials;
+import org.apache.rocketmq.ons.api.LifeCycle;
 import org.apache.rocketmq.ons.api.Message;
+import org.apache.rocketmq.ons.api.Producer;
 import org.apache.rocketmq.ons.api.SendResult;
 
-
-public interface OrderProducer extends Admin {
+/**
+ * This interface will be removed in the year 2021, {@link Producer#send(Message, String)} is recommended
+ */
+@Deprecated
+public interface OrderProducer extends LifeCycle, Credentials {
 
     SendResult send(final Message message, final String shardingKey);
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java
index eb46593..cb3aa0a 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java
@@ -21,4 +21,6 @@ import org.apache.rocketmq.ons.api.Message;
 public interface LocalTransactionChecker {
 
     TransactionStatus check(final Message msg);
+
+
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecutor.java
similarity index 88%
rename from ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java
rename to ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecutor.java
index f3fe785..fad6493 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecutor.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.ons.api.transaction;
 import org.apache.rocketmq.ons.api.Message;
 
 
-public interface LocalTransactionExecuter {
+public interface LocalTransactionExecutor {
 
-    TransactionStatus execute(final Message msg, final Object arg);
+    TransactionStatus execute(final Message message, final Object arg);
 }
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java
index c139db1..f9878a4 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java
@@ -17,22 +17,14 @@
 
 package org.apache.rocketmq.ons.api.transaction;
 
-import org.apache.rocketmq.ons.api.Admin;
+import org.apache.rocketmq.ons.api.Credentials;
+import org.apache.rocketmq.ons.api.LifeCycle;
 import org.apache.rocketmq.ons.api.Message;
 import org.apache.rocketmq.ons.api.SendResult;
 
-
-public interface TransactionProducer extends Admin {
-
-    @Override
-    void start();
-
-
-    @Override
-    void shutdown();
-
+public interface TransactionProducer extends LifeCycle, Credentials {
 
     SendResult send(final Message message,
-        final LocalTransactionExecuter executer,
+        final LocalTransactionExecutor executer,
         final Object arg);
 }
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
index cfbe212..2b6280a 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.ons.api.impl.rocketmq;
 
+import java.util.Collection;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
@@ -219,6 +220,14 @@ public class ProducerImpl extends ONSClientAbstract implements Producer {
         return sendResult;
     }
 
+    @Override public SendResult send(Message message, String shardingKey) {
+        return null;
+    }
+
+    @Override public SendResult send(Collection<Message> messages) {
+        return null;
+    }
+
     private ONSClientException checkProducerException(String topic, String msgId, Throwable e) {
         if (e instanceof MQClientException) {
             if (e.getCause() != null) {
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
index 13ed7e4..abcf954 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
@@ -31,7 +31,7 @@ import org.apache.rocketmq.ons.api.PropertyKeyConst;
 import org.apache.rocketmq.ons.api.SendResult;
 import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl;
 import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
-import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecuter;
+import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecutor;
 import org.apache.rocketmq.ons.api.transaction.TransactionProducer;
 import org.apache.rocketmq.ons.api.transaction.TransactionStatus;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
@@ -123,7 +123,7 @@ public class TransactionProducerImpl extends ONSClientAbstract implements Transa
     }
 
     @Override
-    public SendResult send(final Message message, final LocalTransactionExecuter executer, Object arg) {
+    public SendResult send(final Message message, final LocalTransactionExecutor executer, Object arg) {
         this.checkONSProducerServiceState(this.transactionMQProducer.getDefaultMQProducerImpl());
         org.apache.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message);
         org.apache.rocketmq.client.producer.TransactionSendResult sendResultRMQ = null;
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
index 4d16d51..992bd3b 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.ons.api.ONSFactory;
 import org.apache.rocketmq.ons.api.PropertyKeyConst;
 import org.apache.rocketmq.ons.api.SendResult;
 import org.apache.rocketmq.ons.api.exception.ONSClientException;
-import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecuter;
+import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecutor;
 import org.apache.rocketmq.ons.api.transaction.TransactionProducer;
 import org.apache.rocketmq.ons.api.transaction.TransactionStatus;
 import org.apache.rocketmq.ons.sample.MQConfig;
@@ -44,12 +44,13 @@ public class SimpleTransactionProducer {
 
         for (int i = 0; i < 10; i++) {
             try {
-                SendResult sendResult = transactionProducer.send(message, new LocalTransactionExecuter() {
+                SendResult sendResult = transactionProducer.send(message, new LocalTransactionExecutor() {
                     @Override
                     public TransactionStatus execute(Message msg, Object arg) {
                         System.out.printf("Execute local transaction and return TransactionStatus. %n");
                         return TransactionStatus.CommitTransaction;
                     }
+
                 }, null);
                 assert sendResult != null;
             } catch (ONSClientException e) {