You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/06/08 13:17:51 UTC

[6/6] flink git commit: [FLINK-3405] [nifi] Extend NiFiSource with interface StoppableFunction

[FLINK-3405] [nifi] Extend NiFiSource with interface StoppableFunction

This closes #2047


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38362c40
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38362c40
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38362c40

Branch: refs/heads/master
Commit: 38362c40bdd2e9a350630988c534b2859854d379
Parents: 6afb2b0
Author: smarthi <sm...@apache.org>
Authored: Sun May 29 02:15:16 2016 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 8 15:17:10 2016 +0200

----------------------------------------------------------------------
 .../flink-connector-nifi/pom.xml                |  2 +-
 .../streaming/connectors/nifi/NiFiSource.java   | 24 ++++++++++++++------
 2 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/38362c40/flink-streaming-connectors/flink-connector-nifi/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/pom.xml b/flink-streaming-connectors/flink-connector-nifi/pom.xml
index d93bce7..a18d7b9 100644
--- a/flink-streaming-connectors/flink-connector-nifi/pom.xml
+++ b/flink-streaming-connectors/flink-connector-nifi/pom.xml
@@ -37,7 +37,7 @@ under the License.
 
 	<!-- Allow users to pass custom connector versions -->
 	<properties>
-		<nifi.version>0.3.0</nifi.version>
+		<nifi.version>0.6.1</nifi.version>
 	</properties>
 
 	<dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/38362c40/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
index a213bb4..00b6921 100644
--- a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
+++ b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.nifi;
 
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.nifi.remote.Transaction;
@@ -37,7 +38,7 @@ import java.util.Map;
  * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source
  * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile.
  */
-public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
+public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements StoppableFunction{
 
 	private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class);
 
@@ -46,7 +47,7 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
 	private long waitTimeMs;
 	private SiteToSiteClient client;
 	private SiteToSiteClientConfig clientConfig;
-	private transient volatile boolean running;
+	private volatile boolean isRunning = true;
 
 	/**
 	 * Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.
@@ -72,19 +73,19 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
 		client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
-		running = true;
+		isRunning = true;
 	}
 
 	@Override
 	public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
 		try {
-			while (running) {
+			while (isRunning) {
 				final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
 				if (transaction == null) {
 					LOG.warn("A transaction could not be created, waiting and will try again...");
 					try {
 						Thread.sleep(waitTimeMs);
-					} catch (InterruptedException e) {
+					} catch (InterruptedException ignored) {
 
 					}
 					continue;
@@ -98,7 +99,7 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
 					LOG.debug("No data available to pull, waiting and will try again...");
 					try {
 						Thread.sleep(waitTimeMs);
-					} catch (InterruptedException e) {
+					} catch (InterruptedException ignored) {
 
 					}
 					continue;
@@ -134,7 +135,7 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
 
 	@Override
 	public void cancel() {
-		running = false;
+		isRunning = false;
 	}
 
 	@Override
@@ -143,4 +144,13 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
 		client.close();
 	}
 
+ /**
+	* {@inheritDoc}
+	* <p>
+	* Sets the {@link #isRunning} flag to {@code false}.
+	*/
+	@Override
+	public void stop() {
+		this.isRunning = false;
+	}
 }