You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/06/08 13:17:51 UTC
[6/6] flink git commit: [FLINK-3405] [nifi] Extend NiFiSource with
interface StoppableFunction
[FLINK-3405] [nifi] Extend NiFiSource with interface StoppableFunction
This closes #2047
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38362c40
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38362c40
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38362c40
Branch: refs/heads/master
Commit: 38362c40bdd2e9a350630988c534b2859854d379
Parents: 6afb2b0
Author: smarthi <sm...@apache.org>
Authored: Sun May 29 02:15:16 2016 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 8 15:17:10 2016 +0200
----------------------------------------------------------------------
.../flink-connector-nifi/pom.xml | 2 +-
.../streaming/connectors/nifi/NiFiSource.java | 24 ++++++++++++++------
2 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/38362c40/flink-streaming-connectors/flink-connector-nifi/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/pom.xml b/flink-streaming-connectors/flink-connector-nifi/pom.xml
index d93bce7..a18d7b9 100644
--- a/flink-streaming-connectors/flink-connector-nifi/pom.xml
+++ b/flink-streaming-connectors/flink-connector-nifi/pom.xml
@@ -37,7 +37,7 @@ under the License.
<!-- Allow users to pass custom connector versions -->
<properties>
- <nifi.version>0.3.0</nifi.version>
+ <nifi.version>0.6.1</nifi.version>
</properties>
<dependencies>
http://git-wip-us.apache.org/repos/asf/flink/blob/38362c40/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
index a213bb4..00b6921 100644
--- a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
+++ b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.connectors.nifi;
+import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.nifi.remote.Transaction;
@@ -37,7 +38,7 @@ import java.util.Map;
* A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source
* produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile.
*/
-public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
+public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements StoppableFunction{
private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class);
@@ -46,7 +47,7 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
private long waitTimeMs;
private SiteToSiteClient client;
private SiteToSiteClientConfig clientConfig;
- private transient volatile boolean running;
+ private volatile boolean isRunning = true;
/**
* Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms.
@@ -72,19 +73,19 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
- running = true;
+ isRunning = true;
}
@Override
public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
try {
- while (running) {
+ while (isRunning) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
if (transaction == null) {
LOG.warn("A transaction could not be created, waiting and will try again...");
try {
Thread.sleep(waitTimeMs);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignored) {
}
continue;
@@ -98,7 +99,7 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
LOG.debug("No data available to pull, waiting and will try again...");
try {
Thread.sleep(waitTimeMs);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignored) {
}
continue;
@@ -134,7 +135,7 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
@Override
public void cancel() {
- running = false;
+ isRunning = false;
}
@Override
@@ -143,4 +144,13 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
client.close();
}
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Sets the {@link #isRunning} flag to {@code false}.
+ */
+ @Override
+ public void stop() {
+ this.isRunning = false;
+ }
}