You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/05/29 17:38:25 UTC

[nifi] branch master updated: NIFI-6701 - Fix for PublishGCPPubSub

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8162edf  NIFI-6701 - Fix for PublishGCPPubSub
8162edf is described below

commit 8162edfd9891159cc1636c4f496de911ee752b9c
Author: Pierre Villard <pi...@gmail.com>
AuthorDate: Fri May 29 12:11:58 2020 +0200

    NIFI-6701 - Fix for PublishGCPPubSub
---
 .../apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java |  1 -
 .../apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java | 14 ++++----------
 .../nifi-gcp-bundle/nifi-gcp-services-api/pom.xml          |  1 -
 nifi-nar-bundles/nifi-gcp-bundle/pom.xml                   |  4 ++--
 4 files changed, 6 insertions(+), 14 deletions(-)

diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 23aaff0..9440a43 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -98,7 +98,6 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
 
         pullRequest = PullRequest.newBuilder()
                 .setMaxMessages(batchSize)
-                .setReturnImmediately(false)
                 .setSubscription(getSubscriptionName(context))
                 .build();
 
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index 79e8614..3e8e0f8 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -121,6 +121,7 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
         );
     }
 
+    @Override
     @OnScheduled
     public void onScheduled(ProcessContext context) {
         try {
@@ -162,14 +163,8 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
 
                     ApiFuture<String> messageIdFuture = publisher.publish(message);
 
-                    while (messageIdFuture.isDone()) {
-                        Thread.sleep(500L);
-                    }
-
-                    final String messageId = messageIdFuture.get();
                     final Map<String, String> attributes = new HashMap<>();
-
-                    attributes.put(MESSAGE_ID_ATTRIBUTE, messageId);
+                    attributes.put(MESSAGE_ID_ATTRIBUTE, messageIdFuture.get());
                     attributes.put(TOPIC_NAME_ATTRIBUTE, topicName);
 
                     flowFile = session.putAllAttributes(flowFile, attributes);
@@ -180,11 +175,10 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
                                         "so routing to retry", new Object[]{topicName, e.getLocalizedMessage()}, e);
                         session.transfer(flowFile, REL_RETRY);
                     } else {
-                        getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {}",
-                                new Object[]{topicName, e});
+                        getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {}", new Object[]{topicName, e});
                         session.transfer(flowFile, REL_FAILURE);
-                        context.yield();
                     }
+                    context.yield();
                 }
             }
         } finally {
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
index e4f700d..c41c43e 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
@@ -33,7 +33,6 @@
         <dependency>
             <groupId>com.google.auth</groupId>
             <artifactId>google-auth-library-oauth2-http</artifactId>
-            <version>0.18.0</version>
             <exclusions>
                 <exclusion>
                     <groupId>com.google.code.findbugs</groupId>
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
index a0da073..d7735f1 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
@@ -27,13 +27,13 @@
     <packaging>pom</packaging>
 
     <properties>
-        <google.cloud.sdk.version>0.120.2-alpha</google.cloud.sdk.version>
+        <google.cloud.sdk.version>0.125.0</google.cloud.sdk.version>
     </properties>
 
     <dependencyManagement>
         <dependencies>
             <dependency>
-            	<!-- https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-bom -->
+            	<!-- https://github.com/googleapis/java-cloud-bom -->
                 <groupId>com.google.cloud</groupId>
                 <artifactId>google-cloud-bom</artifactId>
                 <version>${google.cloud.sdk.version}</version>