You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/08/30 03:21:23 UTC
[rocketmq-ons] 23/29: (1) Polish existed ons api (2) Add pull
consumer Implementation
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch pullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git
commit f14be542b1f9f02c225cdb179c97b1b517450d44
Author: duhenglucky <du...@gmail.com>
AuthorDate: Wed Jul 17 13:55:00 2019 +0800
(1) Polish existed ons api (2) Add pull consumer Implementation
---
.../java/org/apache/rocketmq/ons/api/Admin.java | 18 +---------------
.../java/org/apache/rocketmq/ons/api/Consumer.java | 5 +----
.../ons/api/{Admin.java => Credentials.java} | 17 +--------------
.../ons/api/{Admin.java => LifeCycle.java} | 15 ++------------
...alTransactionChecker.java => MessageQueue.java} | 9 ++------
.../java/org/apache/rocketmq/ons/api/Producer.java | 14 ++++++-------
.../ons/api/{Admin.java => PullConsumer.java} | 24 +++++++++++++++-------
.../rocketmq/ons/api/batch/BatchConsumer.java | 5 +++--
.../apache/rocketmq/ons/api/bean/ProducerBean.java | 11 ++++++++++
.../ons/api/bean/TransactionProducerBean.java | 4 ++--
.../rocketmq/ons/api/order/OrderConsumer.java | 6 +++---
.../rocketmq/ons/api/order/OrderProducer.java | 11 +++++++---
.../api/transaction/LocalTransactionChecker.java | 2 ++
...Executer.java => LocalTransactionExecutor.java} | 4 ++--
.../ons/api/transaction/TransactionProducer.java | 16 ++++-----------
.../ons/api/impl/rocketmq/ProducerImpl.java | 9 ++++++++
.../api/impl/rocketmq/TransactionProducerImpl.java | 4 ++--
.../sample/producer/SimpleTransactionProducer.java | 5 +++--
18 files changed, 79 insertions(+), 100 deletions(-)
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
index 702d9a5..46f1e79 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
@@ -16,22 +16,6 @@
*/
package org.apache.rocketmq.ons.api;
-import java.util.Properties;
+public interface Admin extends LifeCycle, Credentials {
-
-public interface Admin {
-
- boolean isStarted();
-
-
- boolean isClosed();
-
-
- void start();
-
-
- void updateCredential(Properties credentialProperties);
-
-
- void shutdown();
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java
index a592559..03d8f7c 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Consumer.java
@@ -16,14 +16,11 @@
*/
package org.apache.rocketmq.ons.api;
-
-public interface Consumer extends Admin {
+public interface Consumer extends LifeCycle, Credentials {
void subscribe(final String topic, final String subExpression, final MessageListener listener);
-
void subscribe(final String topic, final MessageSelector selector, final MessageListener listener);
-
void unsubscribe(final String topic);
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Credentials.java
similarity index 88%
copy from ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
copy to ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Credentials.java
index 702d9a5..08a90c8 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Credentials.java
@@ -15,23 +15,8 @@
* limitations under the License.
*/
package org.apache.rocketmq.ons.api;
-
import java.util.Properties;
-
-public interface Admin {
-
- boolean isStarted();
-
-
- boolean isClosed();
-
-
- void start();
-
-
+public interface Credentials {
void updateCredential(Properties credentialProperties);
-
-
- void shutdown();
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
similarity index 84%
copy from ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
copy to ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
index 702d9a5..51d96c4 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/LifeCycle.java
@@ -1,4 +1,4 @@
-/*
+package org.apache.rocketmq.ons.api;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,24 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.ons.api;
-
-import java.util.Properties;
-
-
-public interface Admin {
+public interface LifeCycle {
boolean isStarted();
-
boolean isClosed();
-
void start();
-
- void updateCredential(Properties credentialProperties);
-
-
void shutdown();
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java
similarity index 81%
copy from ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java
copy to ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java
index eb46593..00f8b2d 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/MessageQueue.java
@@ -14,11 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.ons.api.transaction;
-
-import org.apache.rocketmq.ons.api.Message;
-
-public interface LocalTransactionChecker {
-
- TransactionStatus check(final Message msg);
+package org.apache.rocketmq.ons.api;
+public class MessageQueue {
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java
index e45e613..3bd1a0d 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Producer.java
@@ -16,16 +16,10 @@
*/
package org.apache.rocketmq.ons.api;
+import java.util.Collection;
import java.util.concurrent.ExecutorService;
-public interface Producer extends Admin {
-
-
- @Override
- void start();
-
- @Override
- void shutdown();
+public interface Producer extends LifeCycle, Credentials {
SendResult send(final Message message);
@@ -34,4 +28,8 @@ public interface Producer extends Admin {
void sendAsync(final Message message, final SendCallback sendCallback);
void setCallbackExecutor(final ExecutorService callbackExecutor);
+
+ SendResult send(final Message message, final String shardingKey);
+
+ SendResult send(final Collection<Message> messages);
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
similarity index 54%
copy from ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
copy to ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
index 702d9a5..abdd966 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/Admin.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/PullConsumer.java
@@ -16,22 +16,32 @@
*/
package org.apache.rocketmq.ons.api;
-import java.util.Properties;
+import java.util.Collection;
+import java.util.List;
+public interface PullConsumer {
-public interface Admin {
+ void subscribe(final Collection<String> topics);
- boolean isStarted();
+ void subscribe(final Collection<String> topics, final String subExpression);
+ void unsubscribe(final Collection<String> topics);
- boolean isClosed();
+ List<Message> poll(long timeout);
+ void seek(MessageQueue messageQueue, long offset);
- void start();
+ void seekToBeginning(MessageQueue messageQueue);
+ void seekToEnd(MessageQueue messageQueuea);
- void updateCredential(Properties credentialProperties);
+ void pause(Collection<MessageQueue> messageQueues);
+ void resume(Collection<MessageQueue> partitions);
- void shutdown();
+ Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp);
+
+ Long committed(MessageQueue messageQueue);
+
+ void commitSync();
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java
index 848a139..d5887bc 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/batch/BatchConsumer.java
@@ -17,9 +17,10 @@
package org.apache.rocketmq.ons.api.batch;
-import org.apache.rocketmq.ons.api.Admin;
+import org.apache.rocketmq.ons.api.Credentials;
+import org.apache.rocketmq.ons.api.LifeCycle;
-public interface BatchConsumer extends Admin {
+public interface BatchConsumer extends LifeCycle, Credentials {
void subscribe(final String topic, final String subExpression, final BatchMessageListener listener);
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
index 2094158..548cc82 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/ProducerBean.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.ons.api.bean;
+import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.ons.api.Message;
@@ -95,4 +96,14 @@ public class ProducerBean implements Producer {
public boolean isClosed() {
return this.producer.isClosed();
}
+
+ @Override public SendResult send(Message message, String shardingKey) {
+ //TODO
+ return null;
+ }
+
+ @Override public SendResult send(Collection<Message> messages) {
+ //TODO
+ return null;
+ }
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java
index cb55e14..ddf513c 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/bean/TransactionProducerBean.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.ons.api.ONSFactory;
import org.apache.rocketmq.ons.api.SendResult;
import org.apache.rocketmq.ons.api.exception.ONSClientException;
import org.apache.rocketmq.ons.api.transaction.LocalTransactionChecker;
-import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecuter;
+import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecutor;
import org.apache.rocketmq.ons.api.transaction.TransactionProducer;
@@ -63,7 +63,7 @@ public class TransactionProducerBean implements TransactionProducer {
}
@Override
- public SendResult send(Message message, LocalTransactionExecuter executer, Object arg) {
+ public SendResult send(Message message, LocalTransactionExecutor executer, Object arg) {
return this.transactionProducer.send(message, executer, arg);
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java
index 4e73c1b..09b9127 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderConsumer.java
@@ -17,11 +17,11 @@
package org.apache.rocketmq.ons.api.order;
-import org.apache.rocketmq.ons.api.Admin;
+import org.apache.rocketmq.ons.api.Credentials;
+import org.apache.rocketmq.ons.api.LifeCycle;
import org.apache.rocketmq.ons.api.MessageSelector;
-
-public interface OrderConsumer extends Admin {
+public interface OrderConsumer extends LifeCycle, Credentials {
@Override
void start();
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java
index 4e9a0bb..25df1d0 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/order/OrderProducer.java
@@ -17,12 +17,17 @@
package org.apache.rocketmq.ons.api.order;
-import org.apache.rocketmq.ons.api.Admin;
+import org.apache.rocketmq.ons.api.Credentials;
+import org.apache.rocketmq.ons.api.LifeCycle;
import org.apache.rocketmq.ons.api.Message;
+import org.apache.rocketmq.ons.api.Producer;
import org.apache.rocketmq.ons.api.SendResult;
-
-public interface OrderProducer extends Admin {
+/**
+ * This interface will be removed in the year 2021, {@link Producer#send(Message, String)} is recommended
+ */
+@Deprecated
+public interface OrderProducer extends LifeCycle, Credentials {
SendResult send(final Message message, final String shardingKey);
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java
index eb46593..cb3aa0a 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionChecker.java
@@ -21,4 +21,6 @@ import org.apache.rocketmq.ons.api.Message;
public interface LocalTransactionChecker {
TransactionStatus check(final Message msg);
+
+
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecutor.java
similarity index 88%
rename from ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java
rename to ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecutor.java
index f3fe785..fad6493 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecuter.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/LocalTransactionExecutor.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.ons.api.transaction;
import org.apache.rocketmq.ons.api.Message;
-public interface LocalTransactionExecuter {
+public interface LocalTransactionExecutor {
- TransactionStatus execute(final Message msg, final Object arg);
+ TransactionStatus execute(final Message message, final Object arg);
}
diff --git a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java
index c139db1..f9878a4 100644
--- a/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java
+++ b/ons-core/ons-api/src/main/java/org/apache/rocketmq/ons/api/transaction/TransactionProducer.java
@@ -17,22 +17,14 @@
package org.apache.rocketmq.ons.api.transaction;
-import org.apache.rocketmq.ons.api.Admin;
+import org.apache.rocketmq.ons.api.Credentials;
+import org.apache.rocketmq.ons.api.LifeCycle;
import org.apache.rocketmq.ons.api.Message;
import org.apache.rocketmq.ons.api.SendResult;
-
-public interface TransactionProducer extends Admin {
-
- @Override
- void start();
-
-
- @Override
- void shutdown();
-
+public interface TransactionProducer extends LifeCycle, Credentials {
SendResult send(final Message message,
- final LocalTransactionExecuter executer,
+ final LocalTransactionExecutor executer,
final Object arg);
}
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
index cfbe212..2b6280a 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.ons.api.impl.rocketmq;
+import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@@ -219,6 +220,14 @@ public class ProducerImpl extends ONSClientAbstract implements Producer {
return sendResult;
}
+ @Override public SendResult send(Message message, String shardingKey) {
+ return null;
+ }
+
+ @Override public SendResult send(Collection<Message> messages) {
+ return null;
+ }
+
private ONSClientException checkProducerException(String topic, String msgId, Throwable e) {
if (e instanceof MQClientException) {
if (e.getCause() != null) {
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
index 13ed7e4..abcf954 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
@@ -31,7 +31,7 @@ import org.apache.rocketmq.ons.api.PropertyKeyConst;
import org.apache.rocketmq.ons.api.SendResult;
import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl;
import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
-import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecuter;
+import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecutor;
import org.apache.rocketmq.ons.api.transaction.TransactionProducer;
import org.apache.rocketmq.ons.api.transaction.TransactionStatus;
import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
@@ -123,7 +123,7 @@ public class TransactionProducerImpl extends ONSClientAbstract implements Transa
}
@Override
- public SendResult send(final Message message, final LocalTransactionExecuter executer, Object arg) {
+ public SendResult send(final Message message, final LocalTransactionExecutor executer, Object arg) {
this.checkONSProducerServiceState(this.transactionMQProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message);
org.apache.rocketmq.client.producer.TransactionSendResult sendResultRMQ = null;
diff --git a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
index 4d16d51..992bd3b 100644
--- a/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
+++ b/ons-sample/src/main/java/org/apache/rocketmq/ons/sample/producer/SimpleTransactionProducer.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.ons.api.ONSFactory;
import org.apache.rocketmq.ons.api.PropertyKeyConst;
import org.apache.rocketmq.ons.api.SendResult;
import org.apache.rocketmq.ons.api.exception.ONSClientException;
-import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecuter;
+import org.apache.rocketmq.ons.api.transaction.LocalTransactionExecutor;
import org.apache.rocketmq.ons.api.transaction.TransactionProducer;
import org.apache.rocketmq.ons.api.transaction.TransactionStatus;
import org.apache.rocketmq.ons.sample.MQConfig;
@@ -44,12 +44,13 @@ public class SimpleTransactionProducer {
for (int i = 0; i < 10; i++) {
try {
- SendResult sendResult = transactionProducer.send(message, new LocalTransactionExecuter() {
+ SendResult sendResult = transactionProducer.send(message, new LocalTransactionExecutor() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
System.out.printf("Execute local transaction and return TransactionStatus. %n");
return TransactionStatus.CommitTransaction;
}
+
}, null);
assert sendResult != null;
} catch (ONSClientException e) {