You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/02/03 19:13:49 UTC
[nifi] branch main updated: NIFI-11092 Fixed ConsumeTwitter handling on Primary Node changes
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0b61a6226c NIFI-11092 Fixed ConsumeTwitter handling on Primary Node changes
0b61a6226c is described below
commit 0b61a6226c359fb97d098eb4c536294f04a4255a
Author: Emilio Setiadarma <em...@gmail.com>
AuthorDate: Thu Jan 26 15:22:19 2023 -0800
NIFI-11092 Fixed ConsumeTwitter handling on Primary Node changes
This closes #6901
Signed-off-by: David Handermann <ex...@apache.org>
---
.../nifi/processors/twitter/ConsumeTwitter.java | 45 ++++++++++++++++++----
.../processors/twitter/TweetStreamService.java | 8 ++--
2 files changed, 43 insertions(+), 10 deletions(-)
diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java
index 041fd80dba..2517456d74 100644
--- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java
+++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java
@@ -26,6 +26,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
@@ -49,6 +51,7 @@ import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@PrimaryNodeOnly
@@ -255,6 +258,8 @@ public class ConsumeTwitter extends AbstractProcessor {
private volatile BlockingQueue<String> messageQueue;
+ private final AtomicBoolean streamStarted = new AtomicBoolean(false);
+
@Override
protected void init(ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
@@ -296,13 +301,13 @@ public class ConsumeTwitter extends AbstractProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) {
messageQueue = new LinkedBlockingQueue<>(context.getProperty(QUEUE_SIZE).asInteger());
-
- tweetStreamService = new TweetStreamService(context, messageQueue, getLogger());
- tweetStreamService.start();
+ streamStarted.set(false);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ startTweetStreamService(context);
+
final String firstTweet = messageQueue.poll();
if (firstTweet == null) {
context.yield();
@@ -338,15 +343,41 @@ public class ConsumeTwitter extends AbstractProcessor {
session.getProvenanceReporter().receive(flowFile, transitUri);
}
+ @OnPrimaryNodeStateChange
+ public void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
+ if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
+ stopTweetStreamService();
+ }
+ }
+
@OnStopped
public void onStopped() {
- if (tweetStreamService != null) {
- tweetStreamService.stop();
- }
- tweetStreamService = null;
+ stopTweetStreamService();
emptyQueue();
}
+ private void startTweetStreamService(final ProcessContext context) {
+ if (streamStarted.compareAndSet(false, true)) {
+ tweetStreamService = new TweetStreamService(context, messageQueue, getLogger());
+ tweetStreamService.start();
+ }
+
+ }
+
+ private void stopTweetStreamService() {
+ if (streamStarted.compareAndSet(true, false)) {
+ if (tweetStreamService != null) {
+ tweetStreamService.stop();
+ }
+ tweetStreamService = null;
+
+ if (!messageQueue.isEmpty()) {
+ getLogger().warn("Stopped consuming stream: unprocessed messages [{}]", messageQueue.size());
+ }
+ }
+ }
+
+
private void emptyQueue() {
while (!messageQueue.isEmpty()) {
messageQueue.poll();
diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java
index 027839b5c7..f9cd8f5f1a 100644
--- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java
+++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java
@@ -44,7 +44,8 @@ public class TweetStreamService {
private final BlockingQueue<String> queue;
private final ComponentLog logger;
- private final ScheduledExecutorService executorService;
+ private ScheduledExecutorService executorService;
+ private final ThreadFactory threadFactory;
private final Set<String> tweetFields;
private final Set<String> userFields;
@@ -107,8 +108,7 @@ public class TweetStreamService {
final String basePath = context.getProperty(ConsumeTwitter.BASE_PATH).getValue();
api.getApiClient().setBasePath(basePath);
- final ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern(ConsumeTwitter.class.getSimpleName()).build();
- this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+ threadFactory = new BasicThreadFactory.Builder().namingPattern(ConsumeTwitter.class.getSimpleName()).build();
}
public String getTransitUri(final String endpoint) {
@@ -128,6 +128,7 @@ public class TweetStreamService {
* to run until {@code stop} is called.
*/
public void start() {
+ this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
executorService.execute(new TweetStreamStarter());
}
@@ -145,6 +146,7 @@ public class TweetStreamService {
}
executorService.shutdownNow();
+ executorService = null;
}
private Long calculateBackoffDelay() {