You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/02/03 19:13:49 UTC

[nifi] branch main updated: NIFI-11092 Fixed ConsumeTwitter handling on Primary Node changes

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b61a6226c NIFI-11092 Fixed ConsumeTwitter handling on Primary Node changes
0b61a6226c is described below

commit 0b61a6226c359fb97d098eb4c536294f04a4255a
Author: Emilio Setiadarma <em...@gmail.com>
AuthorDate: Thu Jan 26 15:22:19 2023 -0800

    NIFI-11092 Fixed ConsumeTwitter handling on Primary Node changes
    
    This closes #6901
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../nifi/processors/twitter/ConsumeTwitter.java    | 45 ++++++++++++++++++----
 .../processors/twitter/TweetStreamService.java     |  8 ++--
 2 files changed, 43 insertions(+), 10 deletions(-)

diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java
index 041fd80dba..2517456d74 100644
--- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java
+++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java
@@ -26,6 +26,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
@@ -49,6 +51,7 @@ import java.util.HashMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 @PrimaryNodeOnly
@@ -255,6 +258,8 @@ public class ConsumeTwitter extends AbstractProcessor {
 
     private volatile BlockingQueue<String> messageQueue;
 
+    private final AtomicBoolean streamStarted = new AtomicBoolean(false);
+
     @Override
     protected void init(ProcessorInitializationContext context) {
         final List<PropertyDescriptor> descriptors = new ArrayList<>();
@@ -296,13 +301,13 @@ public class ConsumeTwitter extends AbstractProcessor {
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         messageQueue = new LinkedBlockingQueue<>(context.getProperty(QUEUE_SIZE).asInteger());
-
-        tweetStreamService = new TweetStreamService(context, messageQueue, getLogger());
-        tweetStreamService.start();
+        streamStarted.set(false);
     }
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        startTweetStreamService(context);
+
         final String firstTweet = messageQueue.poll();
         if (firstTweet == null) {
             context.yield();
@@ -338,15 +343,41 @@ public class ConsumeTwitter extends AbstractProcessor {
         session.getProvenanceReporter().receive(flowFile, transitUri);
     }
 
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
+        if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
+            stopTweetStreamService();
+        }
+    }
+
     @OnStopped
     public void onStopped() {
-        if (tweetStreamService != null) {
-            tweetStreamService.stop();
-        }
-        tweetStreamService = null;
+        stopTweetStreamService();
         emptyQueue();
     }
 
+    private void startTweetStreamService(final ProcessContext context) {
+        if (streamStarted.compareAndSet(false, true)) {
+            tweetStreamService = new TweetStreamService(context, messageQueue, getLogger());
+            tweetStreamService.start();
+        }
+
+    }
+
+    private void stopTweetStreamService() {
+        if (streamStarted.compareAndSet(true, false)) {
+            if (tweetStreamService != null) {
+                tweetStreamService.stop();
+            }
+            tweetStreamService = null;
+
+            if (!messageQueue.isEmpty()) {
+                getLogger().warn("Stopped consuming stream: unprocessed messages [{}]", messageQueue.size());
+            }
+        }
+    }
+
+
     private void emptyQueue() {
         while (!messageQueue.isEmpty()) {
             messageQueue.poll();
diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java
index 027839b5c7..f9cd8f5f1a 100644
--- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java
+++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java
@@ -44,7 +44,8 @@ public class TweetStreamService {
     private final BlockingQueue<String> queue;
     private final ComponentLog logger;
 
-    private final ScheduledExecutorService executorService;
+    private ScheduledExecutorService executorService;
+    private final ThreadFactory threadFactory;
 
     private final Set<String> tweetFields;
     private final Set<String> userFields;
@@ -107,8 +108,7 @@ public class TweetStreamService {
         final String basePath = context.getProperty(ConsumeTwitter.BASE_PATH).getValue();
         api.getApiClient().setBasePath(basePath);
 
-        final ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern(ConsumeTwitter.class.getSimpleName()).build();
-        this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+        threadFactory = new BasicThreadFactory.Builder().namingPattern(ConsumeTwitter.class.getSimpleName()).build();
     }
 
     public String getTransitUri(final String endpoint) {
@@ -128,6 +128,7 @@ public class TweetStreamService {
      * to run until {@code stop} is called.
      */
     public void start() {
+        this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
         executorService.execute(new TweetStreamStarter());
     }
 
@@ -145,6 +146,7 @@ public class TweetStreamService {
         }
 
         executorService.shutdownNow();
+        executorService = null;
     }
 
     private Long calculateBackoffDelay() {