You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/05/29 17:38:25 UTC
[nifi] branch master updated: NIFI-6701 - Fix for PublishGCPPubSub
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 8162edf NIFI-6701 - Fix for PublishGCPPubSub
8162edf is described below
commit 8162edfd9891159cc1636c4f496de911ee752b9c
Author: Pierre Villard <pi...@gmail.com>
AuthorDate: Fri May 29 12:11:58 2020 +0200
NIFI-6701 - Fix for PublishGCPPubSub
---
.../apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java | 1 -
.../apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java | 14 ++++----------
.../nifi-gcp-bundle/nifi-gcp-services-api/pom.xml | 1 -
nifi-nar-bundles/nifi-gcp-bundle/pom.xml | 4 ++--
4 files changed, 6 insertions(+), 14 deletions(-)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 23aaff0..9440a43 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -98,7 +98,6 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
pullRequest = PullRequest.newBuilder()
.setMaxMessages(batchSize)
- .setReturnImmediately(false)
.setSubscription(getSubscriptionName(context))
.build();
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index 79e8614..3e8e0f8 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -121,6 +121,7 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
);
}
+ @Override
@OnScheduled
public void onScheduled(ProcessContext context) {
try {
@@ -162,14 +163,8 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
ApiFuture<String> messageIdFuture = publisher.publish(message);
- while (messageIdFuture.isDone()) {
- Thread.sleep(500L);
- }
-
- final String messageId = messageIdFuture.get();
final Map<String, String> attributes = new HashMap<>();
-
- attributes.put(MESSAGE_ID_ATTRIBUTE, messageId);
+ attributes.put(MESSAGE_ID_ATTRIBUTE, messageIdFuture.get());
attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
flowFile = session.putAllAttributes(flowFile, attributes);
@@ -180,11 +175,10 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
"so routing to retry", new Object[]{topicName, e.getLocalizedMessage()}, e);
session.transfer(flowFile, REL_RETRY);
} else {
- getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {}",
- new Object[]{topicName, e});
+ getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {}", new Object[]{topicName, e});
session.transfer(flowFile, REL_FAILURE);
- context.yield();
}
+ context.yield();
}
}
} finally {
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
index e4f700d..c41c43e 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
@@ -33,7 +33,6 @@
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
- <version>0.18.0</version>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
index a0da073..d7735f1 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
@@ -27,13 +27,13 @@
<packaging>pom</packaging>
<properties>
- <google.cloud.sdk.version>0.120.2-alpha</google.cloud.sdk.version>
+ <google.cloud.sdk.version>0.125.0</google.cloud.sdk.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
- <!-- https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-bom -->
+ <!-- https://github.com/googleapis/java-cloud-bom -->
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bom</artifactId>
<version>${google.cloud.sdk.version}</version>