You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2020/03/14 20:00:23 UTC
[ignite-extensions] branch master updated: Added acknowledgement
for pub/sub Streamer - Fixes #8.
This is an automated email from the ASF dual-hosted git repository.
samaitra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 173f116 Added acknowledgement for pub/sub Streamer - Fixes #8.
173f116 is described below
commit 173f1166404eb017162a124d26df6bcb5fd99ba5
Author: gkatzioura <gk...@gmail.com>
AuthorDate: Sat Mar 14 15:00:34 2020 -0500
Added acknowledgement for pub/sub Streamer - Fixes #8.
Signed-off-by: samaitra <sa...@gmail.com>
---
.../ignite/stream/pubsub/PubSubStreamer.java | 10 ++++++++
.../ignite/stream/pubsub/MockPubSubServer.java | 28 ++++++++++++++++++----
2 files changed, 34 insertions(+), 4 deletions(-)
diff --git a/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
index 13384d2..3a4b689 100644
--- a/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
+++ b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
@@ -35,6 +35,7 @@ import org.apache.ignite.stream.StreamAdapter;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
@@ -224,9 +225,18 @@ public class PubSubStreamer<K,V> extends StreamAdapter<PubsubMessage, K, V> {
PullResponse pullResponse = subscriberStub.pullCallable().call(pullRequest);
+ List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
addMessage(message.getMessage());
+ ackIds.add(message.getAckId());
}
+
+ AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
+ .setSubscription(subscriptionName)
+ .addAllAckIds(ackIds)
+ .build();
+
+ subscriberStub.acknowledgeCallable().call(acknowledgeRequest);
}
} finally {
subscriberStub.close();
diff --git a/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
index 20fe767..714961c 100644
--- a/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
+++ b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
@@ -24,15 +24,15 @@ import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.protobuf.Empty;
+import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
@@ -68,7 +68,6 @@ class MockPubSubServer {
public static final int MESSAGES_PER_REQUEST = 10;
private final Map<String, Publisher> publishers = new HashMap<>();
- private final List<PubsubMessage> topicMessages = new ArrayList<>();
private final Queue<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>();
public SubscriberStubSettings createSubscriberStub() throws IOException {
@@ -87,10 +86,31 @@ class MockPubSubServer {
@NotNull
private ManagedChannel managedChannel() {
ManagedChannel managedChannel = Mockito.mock(ManagedChannel.class);
- when(managedChannel.newCall(any(MethodDescriptor.class),any(CallOptions.class))).thenAnswer((la) -> clientCall());
+ when(managedChannel.newCall(any(MethodDescriptor.class),any(CallOptions.class))).thenAnswer((la) -> {
+ MethodDescriptor methodDescriptor = (MethodDescriptor) la.getArguments()[0];
+ if(methodDescriptor.getFullMethodName().equals("google.pubsub.v1.Subscriber/Acknowledge")) {
+ return acknowledgeCall();
+ }
+
+ return clientCall();
+ });
return managedChannel;
}
+ private ClientCall<AcknowledgeRequest, Empty> acknowledgeCall() {
+ ClientCall<AcknowledgeRequest, Empty> clientCall = Mockito.mock(ClientCall.class);
+ doAnswer(iom -> {
+ Object[] arguments = iom.getArguments();
+ ClientCall.Listener<Empty> listener = (ClientCall.Listener<Empty>) arguments[0];
+ listener.onMessage(Empty.getDefaultInstance());
+ Metadata metadata = (Metadata) arguments[1];
+ listener.onClose(Status.OK, metadata);
+ return null;
+ }
+ ).when(clientCall).start(any(ClientCall.Listener.class),any(Metadata.class));
+ return clientCall;
+ }
+
private ClientCall<PullRequest, PullResponse> clientCall() {
ClientCall<PullRequest, PullResponse> clientCall = Mockito.mock(ClientCall.class);