You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2021/05/26 19:56:07 UTC
[nifi] branch main updated: NIFI-8631: Ensure that GCP Pub/Sub
messages are not acknowledged until session has been committed,
in order ot ensure that we don't have data loss
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 46b1f67 NIFI-8631: Ensure that GCP Pub/Sub messages are not acknowledged until session has been committed, in order ot ensure that we don't have data loss
46b1f67 is described below
commit 46b1f6755c5ca3cc4bdb55e48fd09e1216b66d71
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed May 26 11:34:51 2021 -0400
NIFI-8631: Ensure that GCP Pub/Sub messages are not acknowledged until session has been committed, in order ot ensure that we don't have data loss
This closes #5102.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../processors/gcp/pubsub/ConsumeGCPubSub.java | 32 +++++++++++++---------
1 file changed, 19 insertions(+), 13 deletions(-)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 5693721..70b9e26 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -45,6 +45,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -130,11 +131,10 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
}
@Override
- public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (subscriber == null) {
-
if (storedException.get() != null) {
- getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", new Object[]{storedException.get()});
+ getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", storedException.get());
} else {
getLogger().error("Google Cloud PubSub Subscriber was not properly created. Yielding the processor...");
}
@@ -145,6 +145,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
final PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
final List<String> ackIds = new ArrayList<>();
+ final String subscriptionName = getSubscriptionName(context);
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
if (message.hasMessage()) {
@@ -164,20 +165,26 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toByteArray()));
session.transfer(flowFile, REL_SUCCESS);
- session.getProvenanceReporter().receive(flowFile, getSubscriptionName(context));
+ session.getProvenanceReporter().receive(flowFile, subscriptionName);
}
}
- if (!ackIds.isEmpty()) {
- AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
- .addAllAckIds(ackIds)
- .setSubscription(getSubscriptionName(context))
- .build();
- subscriber.acknowledgeCallable().call(acknowledgeRequest);
+ session.commitAsync(() -> acknowledgeAcks(ackIds, subscriptionName));
+ }
+
+ private void acknowledgeAcks(final Collection<String> ackIds, final String subscriptionName) {
+ if (ackIds == null || ackIds.isEmpty()) {
+ return;
}
+
+ AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
+ .addAllAckIds(ackIds)
+ .setSubscription(subscriptionName)
+ .build();
+ subscriber.acknowledgeCallable().call(acknowledgeRequest);
}
- private String getSubscriptionName(ProcessContext context) {
+ private String getSubscriptionName(final ProcessContext context) {
final String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
@@ -189,8 +196,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
}
- private SubscriberStub getSubscriber(ProcessContext context) throws IOException {
-
+ private SubscriberStub getSubscriber(final ProcessContext context) throws IOException {
final SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.build();