You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/03/04 22:53:15 UTC
[2/3] flink git commit: [FLINK-1625] [streaming] [api-breaking] Added
proper cancellation to StreamInvokables + Sink- and SourceFunction interfaces
extended with cancel method
[FLINK-1625] [streaming] [api-breaking] Added proper cancellation to StreamInvokables + Sink- and SourceFunction interfaces extended with cancel method
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8436e9ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8436e9ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8436e9ce
Branch: refs/heads/master
Commit: 8436e9ce31b52f1bd8c55b8e8c50cafb57cff84f
Parents: 3abd6c8
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Mar 4 10:24:00 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Mar 4 22:38:41 2015 +0100
----------------------------------------------------------------------
.../streaming/connectors/flume/FlumeSink.java | 7 ++
.../streaming/connectors/flume/FlumeSource.java | 6 +-
.../connectors/kafka/KafkaProducerExample.java | 8 +-
.../connectors/kafka/api/KafkaSink.java | 37 ++++---
.../connectors/kafka/api/KafkaSource.java | 32 ++++--
.../streaming/connectors/rabbitmq/RMQSink.java | 5 +
.../connectors/rabbitmq/RMQSource.java | 42 ++++---
.../connectors/twitter/TwitterSource.java | 35 +++---
.../connectors/twitter/TwitterStreaming.java | 4 +
.../api/function/sink/FileSinkFunction.java | 9 ++
.../api/function/sink/PrintSinkFunction.java | 8 +-
.../api/function/sink/SinkFunction.java | 4 +-
.../sink/WriteSinkFunctionByMillis.java | 5 +
.../function/source/FileMonitoringFunction.java | 18 ++-
.../api/function/source/FileSourceFunction.java | 28 +++--
.../function/source/FromElementsFunction.java | 6 +-
.../function/source/GenSequenceFunction.java | 6 +-
.../source/SocketTextStreamFunction.java | 110 ++++++++++++-------
.../api/function/source/SourceFunction.java | 6 +-
.../streaming/api/invokable/SinkInvokable.java | 8 +-
.../api/invokable/SourceInvokable.java | 8 +-
.../api/invokable/StreamInvokable.java | 12 +-
.../invokable/operator/CounterInvokable.java | 6 +-
.../api/invokable/operator/FilterInvokable.java | 8 +-
.../invokable/operator/FlatMapInvokable.java | 8 +-
.../api/invokable/operator/MapInvokable.java | 8 +-
.../invokable/operator/ProjectInvokable.java | 2 +-
.../operator/StreamReduceInvokable.java | 8 +-
.../api/invokable/operator/co/CoInvokable.java | 17 ++-
.../windowing/GroupedStreamDiscretizer.java | 7 +-
.../windowing/GroupedWindowBufferInvokable.java | 8 +-
.../operator/windowing/StreamDiscretizer.java | 2 +-
.../windowing/WindowBufferInvokable.java | 11 +-
.../operator/windowing/WindowFlattener.java | 8 +-
.../operator/windowing/WindowMerger.java | 8 +-
.../operator/windowing/WindowPartitioner.java | 8 +-
.../api/streamvertex/StreamVertex.java | 7 ++
.../apache/flink/streaming/api/IterateTest.java | 4 +
.../flink/streaming/api/OutputSplitterTest.java | 16 +++
.../streaming/api/WindowCrossJoinTest.java | 8 ++
.../api/collector/DirectedOutputTest.java | 4 +
.../windowing/WindowIntegrationTest.java | 28 +++++
.../api/streamvertex/StreamVertexTest.java | 15 ++-
.../apache/flink/streaming/util/MockSource.java | 2 +-
.../streaming/examples/join/WindowJoin.java | 14 ++-
.../ml/IncrementalLearningSkeleton.java | 14 ++-
.../examples/windowing/StockPrices.java | 14 ++-
.../windowing/TopSpeedWindowingExample.java | 6 +-
.../flink/streaming/api/scala/DataStream.scala | 1 +
.../api/scala/StreamExecutionEnvironment.scala | 3 +-
.../test/classloading/jar/StreamingProgram.java | 10 +-
51 files changed, 481 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 8a2f2b8..86fd1b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -134,6 +134,13 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
}
@Override
+ public void cancel() {
+ if (client != null) {
+ client.client.close();
+ }
+ }
+
+ @Override
public void open(Configuration config) {
client = new FlinkRpcClientFacade();
client.init(host, port);
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index 4f6ec2d..2a321a2 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -130,7 +130,7 @@ public class FlumeSource<OUT> extends ConnectorSource<OUT> {
* The Collector for sending data to the datastream
*/
@Override
- public void invoke(Collector<OUT> collector) throws Exception {
+ public void run(Collector<OUT> collector) throws Exception {
configureAvroSource(collector);
avroSource.start();
while (!finished) {
@@ -138,4 +138,8 @@ public class FlumeSource<OUT> extends ConnectorSource<OUT> {
}
}
+ @Override
+ public void cancel() {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index 2c2bf80..1cd1192 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -41,7 +41,7 @@ public class KafkaProducerExample {
@SuppressWarnings({ "unused", "serial" })
DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {
@Override
- public void invoke(Collector<String> collector) throws Exception {
+ public void run(Collector<String> collector) throws Exception {
for (int i = 0; i < 100; i++) {
collector.collect("message #" + i);
Thread.sleep(100L);
@@ -49,6 +49,12 @@ public class KafkaProducerExample {
collector.collect(new String("q"));
}
+
+ @Override
+ public void cancel() {
+ }
+
+
}).addSink(
new KafkaSink<String>(topic, host + ":" + port, new JavaDefaultStringSchema())
)
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 5324480..d14772b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -33,9 +33,9 @@ import org.apache.flink.streaming.connectors.util.SerializationSchema;
/**
* Sink that emits its inputs to a Kafka topic.
- *
+ *
* @param <IN>
- * Type of the sink input
+ * Type of the sink input
*/
public class KafkaSink<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
@@ -49,14 +49,15 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
private KafkaPartitioner<IN> partitioner;
/**
- * Creates a KafkaSink for a given topic. The partitioner distributes the messages between the partitions of the topics.
- *
+ * Creates a KafkaSink for a given topic. The partitioner distributes the
+ * messages between the partitions of the topics.
+ *
* @param topicId
- * ID of the Kafka topic.
+ * ID of the Kafka topic.
* @param brokerAddr
- * Address of the Kafka broker (with port number).
+ * Address of the Kafka broker (with port number).
* @param serializationSchema
- * User defined serialization schema.
+ * User defined serialization schema.
*/
public KafkaSink(String topicId, String brokerAddr,
SerializationSchema<IN, byte[]> serializationSchema) {
@@ -64,16 +65,17 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
}
/**
- * Creates a KafkaSink for a given topic. The sink produces its input into the topic.
- *
+ * Creates a KafkaSink for a given topic. The sink produces its input into
+ * the topic.
+ *
* @param topicId
- * ID of the Kafka topic.
+ * ID of the Kafka topic.
* @param brokerAddr
- * Address of the Kafka broker (with port number).
+ * Address of the Kafka broker (with port number).
* @param serializationSchema
- * User defined serialization schema.
+ * User defined serialization schema.
* @param partitioner
- * User defined partitioner.
+ * User defined partitioner.
*/
public KafkaSink(String topicId, String brokerAddr,
SerializationSchema<IN, byte[]> serializationSchema, KafkaPartitioner<IN> partitioner) {
@@ -111,9 +113,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
/**
* Called when new data arrives to the sink, and forwards it to Kafka.
- *
+ *
* @param next
- * The incoming data
+ * The incoming data
*/
@Override
public void invoke(IN next) {
@@ -132,4 +134,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
}
}
+ @Override
+ public void cancel() {
+ close();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 7a185bb..f4097e0 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -53,6 +53,8 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
private long zookeeperSyncTimeMillis;
private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
+ private volatile boolean isRunning = false;
+
/**
* Creates a KafkaSource that consumes a topic.
*
@@ -107,21 +109,31 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
* The Collector for sending data to the dataStream
*/
@Override
- public void invoke(Collector<OUT> collector) throws Exception {
-
- while (consumerIterator.hasNext()) {
- OUT out = schema.deserialize(consumerIterator.next().message());
- if (schema.isEndOfStream(out)) {
- break;
+ public void run(Collector<OUT> collector) throws Exception {
+ isRunning = true;
+ try {
+ while (isRunning && consumerIterator.hasNext()) {
+ OUT out = schema.deserialize(consumerIterator.next().message());
+ if (schema.isEndOfStream(out)) {
+ break;
+ }
+ collector.collect(out);
}
- collector.collect(out);
+ } finally {
+ consumer.shutdown();
}
- consumer.shutdown();
-
}
@Override
- public void open(Configuration config) {
+ public void open(Configuration config) throws Exception {
initializeConnection();
}
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ if (consumer != null) {
+ consumer.shutdown();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 38c4f5f..dae9c6d 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -108,4 +108,9 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
closeChannel();
}
+ @Override
+ public void cancel() {
+ close();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 7ce864e..12ad3d6 100755
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -46,6 +46,8 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
private transient QueueingConsumer consumer;
private transient QueueingConsumer.Delivery delivery;
+ private volatile boolean isRunning = false;
+
OUT out;
public RMQSource(String HOST_NAME, String QUEUE_NAME,
@@ -80,42 +82,46 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
* The Collector for sending data to the dataStream
*/
@Override
- public void invoke(Collector<OUT> collector) throws Exception {
-
- while (true) {
-
- try {
- delivery = consumer.nextDelivery();
- } catch (Exception e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+ public void run(Collector<OUT> collector) throws Exception {
+ isRunning = true;
+ try {
+ while (isRunning) {
+
+ try {
+ delivery = consumer.nextDelivery();
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+ }
}
- }
- out = schema.deserialize(delivery.getBody());
- if (schema.isEndOfStream(out)) {
- break;
- } else {
- collector.collect(out);
+ out = schema.deserialize(delivery.getBody());
+ if (schema.isEndOfStream(out)) {
+ break;
+ } else {
+ collector.collect(out);
+ }
}
+ } finally {
+ connection.close();
}
}
@Override
- public void open(Configuration config) {
+ public void open(Configuration config) throws Exception {
initializeConnection();
}
@Override
- public void close() {
+ public void cancel() {
+ isRunning = false;
try {
connection.close();
} catch (IOException e) {
throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
+ " at " + HOST_NAME, e);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index ddb2538..740907f 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -57,6 +57,8 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
private boolean streaming;
private int numberOfTweets;
+ private volatile boolean isRunning = false;
+
/**
* Create {@link TwitterSource} for streaming
*
@@ -90,20 +92,20 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
}
@Override
- public void invoke(Collector<String> collector) throws Exception {
-
- if (streaming) {
- collectMessages(collector);
- } else {
- collectFiniteMessages(collector);
+ public void run(Collector<String> collector) throws Exception {
+ isRunning = true;
+ try {
+ if (streaming) {
+ collectMessages(collector);
+ } else {
+ collectFiniteMessages(collector);
+ }
+ } finally {
+ closeConnection();
+ isRunning = false;
}
}
- @Override
- public void close() throws Exception {
- closeConnection();
- }
-
/**
* Initialize Hosebird Client to be able to consume Twitter's Streaming API
*/
@@ -196,7 +198,7 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
LOG.info("Tweet-stream begins");
}
- while (true) {
+ while (isRunning) {
collectOneMessage(collector);
}
}
@@ -246,7 +248,8 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
/**
* Get the size of the queue in which the tweets are contained temporarily.
*
- * @return the size of the queue in which the tweets are contained temporarily
+ * @return the size of the queue in which the tweets are contained
+ * temporarily
*/
public int getQueueSize() {
return queueSize;
@@ -280,4 +283,10 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
public void setWaitSec(int waitSec) {
this.waitSec = waitSec;
}
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ closeConnection();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index a32fe1b..9be27eb 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -47,6 +47,10 @@ public class TwitterStreaming {
System.out.println("");
}
+ @Override
+ public void cancel() {
+ }
+
}
public static class SelectDataFlatMap extends
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
index 24beba1..5468494 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
@@ -115,4 +115,13 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
*/
protected abstract void resetParameters();
+ @Override
+ public void cancel() {
+ try {
+ close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index d460749..0fa37ac 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -84,10 +84,9 @@ public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
}
@Override
- public void close() throws Exception {
+ public void close() {
this.stream = null;
this.prefix = null;
- super.close();
}
@Override
@@ -95,4 +94,9 @@ public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
}
+ @Override
+ public void cancel() {
+ close();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index 6097603..05ae34d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.functions.Function;
public interface SinkFunction<IN> extends Function, Serializable {
- public abstract void invoke(IN value) throws Exception;
+ public void invoke(IN value) throws Exception;
+
+ public void cancel();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
index ee6df94..53030f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
@@ -47,4 +47,9 @@ public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
lastTime = System.currentTimeMillis();
}
+ @Override
+ public void cancel() {
+ // No cleanup needed
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
index 05a2489..2a84c0e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
@@ -39,8 +39,10 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
public enum WatchType {
ONLY_NEW_FILES, // Only new files will be processed.
- REPROCESS_WITH_APPENDED, // When some files are appended, all contents of the files will be processed.
- PROCESS_ONLY_APPENDED // When some files are appended, only appended contents will be processed.
+ REPROCESS_WITH_APPENDED, // When some files are appended, all contents
+ // of the files will be processed.
+ PROCESS_ONLY_APPENDED // When some files are appended, only appended
+ // contents will be processed.
}
private String path;
@@ -51,6 +53,8 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
private Map<String, Long> offsetOfFiles;
private Map<String, Long> modificationTimes;
+ private volatile boolean isRunning = false;
+
public FileMonitoringFunction(String path, long interval, WatchType watchType) {
this.path = path;
this.interval = interval;
@@ -60,10 +64,11 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
}
@Override
- public void invoke(Collector<Tuple3<String, Long, Long>> collector) throws Exception {
+ public void run(Collector<Tuple3<String, Long, Long>> collector) throws Exception {
+ isRunning = true;
fileSystem = FileSystem.get(new URI(path));
- while (true) {
+ while (isRunning) {
List<String> files = listNewFiles();
for (String filePath : files) {
if (watchType == WatchType.ONLY_NEW_FILES
@@ -120,4 +125,9 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
}
}
}
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index dcf67a9..d7df266 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -38,6 +38,8 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> {
private TypeInformation<String> typeInfo;
+ private volatile boolean isRunning;
+
public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
this.inputFormat = format;
this.typeInfo = typeInfo;
@@ -51,33 +53,32 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> {
}
@Override
- public void invoke(Collector<String> collector) throws Exception {
+ public void run(Collector<String> collector) throws Exception {
+ isRunning = true;
final TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext()
.getExecutionConfig());
final Iterator<InputSplit> splitIterator = getInputSplits();
@SuppressWarnings("unchecked")
final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat;
try {
- while (splitIterator.hasNext()) {
+ while (isRunning && splitIterator.hasNext()) {
final InputSplit split = splitIterator.next();
String record = serializer.createInstance();
format.open(split);
- try {
- while (!format.reachedEnd()) {
- if ((record = format.nextRecord(record)) != null) {
- collector.collect(record);
- }
+ while (!format.reachedEnd()) {
+ if ((record = format.nextRecord(record)) != null) {
+ collector.collect(record);
}
- } finally {
- format.close();
}
+
}
collector.close();
- } catch (Exception ex) {
- ex.printStackTrace();
+ } finally {
+ format.close();
}
+ isRunning = false;
}
private Iterator<InputSplit> getInputSplits() {
@@ -126,4 +127,9 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> {
}
};
}
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index 8afac75..97a3a92 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -40,10 +40,14 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
}
@Override
- public void invoke(Collector<T> collector) throws Exception {
+ public void run(Collector<T> collector) throws Exception {
for (T element : iterable) {
collector.collect(element);
}
}
+ @Override
+ public void cancel() {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index 3afd06e..eccc146 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -37,7 +37,7 @@ public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
}
@Override
- public void invoke(Collector<Long> collector) throws Exception {
+ public void run(Collector<Long> collector) throws Exception {
while (splitIterator.hasNext()) {
collector.collect(splitIterator.next());
}
@@ -50,4 +50,8 @@ public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
}
+ @Override
+ public void cancel() {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
index 3253c01..67bc128 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
@@ -18,10 +18,12 @@
package org.apache.flink.streaming.api.function.source;
import java.io.BufferedReader;
+import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
@@ -33,7 +35,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
protected static final Logger LOG = LoggerFactory.getLogger(SocketTextStreamFunction.class);
private static final long serialVersionUID = 1L;
-
+
private String hostname;
private int port;
private char delimiter;
@@ -43,6 +45,8 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
+ private volatile boolean isRunning = false;
+
public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
this.hostname = hostname;
this.port = port;
@@ -55,65 +59,91 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
socket = new Socket();
-
+
socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
}
-
+
@Override
- public void invoke(Collector<String> collector) throws Exception {
+ public void run(Collector<String> collector) throws Exception {
streamFromSocket(collector, socket);
}
public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception {
- StringBuffer buffer = new StringBuffer();
- BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ isRunning = true;
+ try {
+ StringBuffer buffer = new StringBuffer();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(
+ socket.getInputStream()));
- while (true) {
- int data = reader.read();
- if (data == -1) {
- socket.close();
- long retry = 0;
- boolean success = false;
- while (retry < maxRetry && !success) {
- if (!retryForever) {
- retry++;
+ while (isRunning) {
+ int data;
+ try {
+ data = reader.read();
+ } catch (SocketException e) {
+ if (!isRunning) {
+ break;
+ } else {
+ throw e;
}
- LOG.warn("Lost connection to server socket. Retrying in " + (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
- try {
- socket = new Socket();
- socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
- success = true;
- } catch (ConnectException ce) {
- Thread.sleep(CONNECTION_RETRY_SLEEP);
+ }
+
+ if (data == -1) {
+ socket.close();
+ long retry = 0;
+ boolean success = false;
+ while (retry < maxRetry && !success) {
+ if (!retryForever) {
+ retry++;
+ }
+ LOG.warn("Lost connection to server socket. Retrying in "
+ + (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
+ try {
+ socket = new Socket();
+ socket.connect(new InetSocketAddress(hostname, port),
+ CONNECTION_TIMEOUT_TIME);
+ success = true;
+ } catch (ConnectException ce) {
+ Thread.sleep(CONNECTION_RETRY_SLEEP);
+ }
}
+
+ if (success) {
+ LOG.info("Server socket is reconnected.");
+ } else {
+ LOG.error("Could not reconnect to server socket.");
+ break;
+ }
+ reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ continue;
}
- if (success) {
- LOG.info("Server socket is reconnected.");
- } else {
- LOG.error("Could not reconnect to server socket.");
- break;
+ if (data == delimiter) {
+ collector.collect(buffer.toString());
+ buffer = new StringBuffer();
+ } else if (data != '\r') { // ignore carriage return
+ buffer.append((char) data);
}
- reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- continue;
}
- if (data == delimiter) {
+ if (buffer.length() > 0) {
collector.collect(buffer.toString());
- buffer = new StringBuffer();
- } else if (data != '\r') { // ignore carriage return
- buffer.append((char) data);
}
- }
-
- if (buffer.length() > 0) {
- collector.collect(buffer.toString());
+ } finally {
+ socket.close();
}
}
@Override
- public void close() throws Exception {
- socket.close();
- super.close();
+ public void cancel() {
+ isRunning = false;
+ if (socket != null && !socket.isClosed()) {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Could not close open socket");
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index 917562a..4f579fe 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -24,6 +24,8 @@ import org.apache.flink.util.Collector;
public interface SourceFunction<OUT> extends Function, Serializable {
- public void invoke(Collector<OUT> collector) throws Exception;
+ public void run(Collector<OUT> collector) throws Exception;
+
+ public void cancel();
-}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 13a6ba1..35060fd 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -31,7 +31,7 @@ public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
@Override
public void invoke() throws Exception {
- while (readNext() != null) {
+ while (isRunning && readNext() != null) {
callUserFunctionAndLogException();
}
}
@@ -47,4 +47,10 @@ public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
callUserFunctionAndLogException();
}
+ @Override
+ public void cancel() {
+ super.cancel();
+ sinkFunction.cancel();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index f1cf2c5..c3f25a0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -39,6 +39,12 @@ public class SourceInvokable<OUT> extends StreamInvokable<OUT, OUT> implements S
@Override
protected void callUserFunction() throws Exception {
- sourceFunction.invoke(collector);
+ sourceFunction.run(collector);
+ }
+
+ @Override
+ public void cancel() {
+ super.cancel();
+ sourceFunction.cancel();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index db7b642..85fb9a4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -107,7 +107,13 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
}
return nextRecord;
} catch (IOException e) {
- throw new RuntimeException("Could not read next record.");
+ if (isRunning) {
+ throw new RuntimeException("Could not read next record due to: "
+ + StringUtils.stringifyException(e));
+ } else {
+ // Task already cancelled do nothing
+ return null;
+ }
}
}
@@ -159,6 +165,10 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
}
}
+ public void cancel() {
+ isRunning = false;
+ }
+
public void setRuntimeContext(RuntimeContext t) {
FunctionUtils.setFunctionRuntimeContext(userFunction, t);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
index 3fc314c..8bb546c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
@@ -30,14 +30,16 @@ public class CounterInvokable<IN> extends ChainableInvokable<IN, Long> {
@Override
public void invoke() throws Exception {
- while (readNext() != null) {
+ while (isRunning && readNext() != null) {
collector.collect(++count);
}
}
@Override
public void collect(IN record) {
- collector.collect(++count);
+ if (isRunning) {
+ collector.collect(++count);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 0c8298e..ab3f147 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -34,7 +34,7 @@ public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
@Override
public void invoke() throws Exception {
- while (readNext() != null) {
+ while (isRunning && readNext() != null) {
callUserFunctionAndLogException();
}
}
@@ -49,7 +49,9 @@ public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
@Override
public void collect(IN record) {
- nextObject = copy(record);
- callUserFunctionAndLogException();
+ if (isRunning) {
+ nextObject = copy(record);
+ callUserFunctionAndLogException();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 2a4081b..025bd32 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -32,7 +32,7 @@ public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
@Override
public void invoke() throws Exception {
- while (readNext() != null) {
+ while (isRunning && readNext() != null) {
callUserFunctionAndLogException();
}
}
@@ -44,8 +44,10 @@ public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
@Override
public void collect(IN record) {
- nextObject = copy(record);
- callUserFunctionAndLogException();
+ if (isRunning) {
+ nextObject = copy(record);
+ callUserFunctionAndLogException();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 7c8e577..8fc1f13 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -32,7 +32,7 @@ public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
@Override
public void invoke() throws Exception {
- while (readNext() != null) {
+ while (isRunning && readNext() != null) {
callUserFunctionAndLogException();
}
}
@@ -44,7 +44,9 @@ public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
@Override
public void collect(IN record) {
- nextObject = copy(record);
- callUserFunctionAndLogException();
+ if (isRunning) {
+ nextObject = copy(record);
+ callUserFunctionAndLogException();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index 31689c7..3e47107 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -41,7 +41,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
@Override
public void invoke() throws Exception {
- while (readNext() != null) {
+ while (isRunning && readNext() != null) {
callUserFunctionAndLogException();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index fe6c41a..e7fa2b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -35,7 +35,7 @@ public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
@Override
public void invoke() throws Exception {
- while (readNext() != null) {
+ while (isRunning && readNext() != null) {
reduce();
}
}
@@ -62,8 +62,10 @@ public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
@Override
public void collect(IN record) {
- nextObject = copy(record);
- callUserFunctionAndLogException();
+ if (isRunning) {
+ nextObject = copy(record);
+ callUserFunctionAndLogException();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 604873e..b41dbbb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -17,6 +17,8 @@
package org.apache.flink.streaming.api.invokable.operator.co;
+import java.io.IOException;
+
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -76,8 +78,19 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
@Override
public void invoke() throws Exception {
- while (true) {
- int next = recordIterator.next(reuse1, reuse2);
+ while (isRunning) {
+ int next;
+ try {
+ next = recordIterator.next(reuse1, reuse2);
+ } catch (IOException e) {
+ if (isRunning) {
+ throw e;
+ } else {
+ // Task already cancelled do nothing
+ next = 0;
+ }
+ }
+
if (next == 0) {
break;
} else if (next == 1) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
index 5e21a31..f14a6ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
@@ -60,11 +60,8 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
@Override
public void invoke() throws Exception {
- if (readNext() == null) {
- throw new RuntimeException("DataStream must not be empty");
- }
- while (nextRecord != null) {
+ while (isRunning && readNext() != null) {
Object key = keySelector.getKey(nextObject);
@@ -76,8 +73,6 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
}
groupDiscretizer.processRealElement(nextObject);
-
- readNext();
}
for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
index 53c87c3..2c3bd75 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
@@ -42,7 +42,7 @@ public class GroupedWindowBufferInvokable<T> extends WindowBufferInvokable<T> {
@Override
public void invoke() throws Exception {
- while (readNext() != null) {
+ while (isRunning && readNext() != null) {
callUserFunctionAndLogException();
}
}
@@ -64,8 +64,10 @@ public class GroupedWindowBufferInvokable<T> extends WindowBufferInvokable<T> {
@Override
public void collect(WindowEvent<T> record) {
- nextObject = record;
- callUserFunctionAndLogException();
+ if (isRunning) {
+ nextObject = record;
+ callUserFunctionAndLogException();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
index 104196e..e668b66 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
@@ -71,7 +71,7 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>>
public void invoke() throws Exception {
// Continuously run
- while (readNext() != null) {
+ while (isRunning && readNext() != null) {
processRealElement(nextObject);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
index ea4b830..75f7d9d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
@@ -26,8 +26,7 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
* This invokable flattens the results of the window transformations by
* outputing the elements of the {@link StreamWindow} one-by-one
*/
-public class WindowBufferInvokable<T> extends
- ChainableInvokable<WindowEvent<T>, StreamWindow<T>> {
+public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>, StreamWindow<T>> {
protected WindowBuffer<T> buffer;
@@ -40,7 +39,7 @@ public class WindowBufferInvokable<T> extends
@Override
public void invoke() throws Exception {
- while (readNext() != null) {
+ while (isRunning && readNext() != null) {
callUserFunctionAndLogException();
}
}
@@ -67,8 +66,10 @@ public class WindowBufferInvokable<T> extends
@Override
public void collect(WindowEvent<T> record) {
- nextObject = record;
- callUserFunctionAndLogException();
+ if (isRunning) {
+ nextObject = record;
+ callUserFunctionAndLogException();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
index edefeef..0ff4724 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
@@ -34,7 +34,7 @@ public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {
@Override
public void invoke() throws Exception {
- while (readNext() != null) {
+ while (isRunning && readNext() != null) {
callUserFunctionAndLogException();
}
}
@@ -48,8 +48,10 @@ public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {
@Override
public void collect(StreamWindow<T> record) {
- nextObject = record;
- callUserFunctionAndLogException();
+ if (isRunning) {
+ nextObject = record;
+ callUserFunctionAndLogException();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
index a58bb9f..f425255 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
@@ -41,7 +41,7 @@ public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, StreamW
@Override
public void invoke() throws Exception {
- while (readNext() != null) {
+ while (isRunning && readNext() != null) {
callUserFunctionAndLogException();
}
}
@@ -69,7 +69,9 @@ public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, StreamW
@Override
public void collect(StreamWindow<T> record) {
- nextObject = record;
- callUserFunctionAndLogException();
+ if (isRunning) {
+ nextObject = record;
+ callUserFunctionAndLogException();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
index e010af4..846650d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
@@ -44,7 +44,7 @@ public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, St
@Override
public void invoke() throws Exception {
- while (readNext() != null) {
+ while (isRunning && readNext() != null) {
callUserFunctionAndLogException();
}
}
@@ -71,8 +71,10 @@ public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, St
@Override
public void collect(StreamWindow<T> record) {
- nextObject = record;
- callUserFunctionAndLogException();
+ if (isRunning) {
+ nextObject = record;
+ callUserFunctionAndLogException();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index bd25e72..99ca098 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -154,6 +154,13 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
}
@Override
+ public void cancel() {
+ if (userInvokable != null) {
+ userInvokable.cancel();
+ }
+ }
+
+ @Override
public StreamConfig getConfig() {
return configuration;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 6ad827a..92d23aa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -69,6 +69,10 @@ public class IterateTest {
public void invoke(Boolean tuple) {
}
+ @Override
+ public void cancel() {
+ }
+
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
index 2486715..a214fbf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -76,6 +76,10 @@ public class OutputSplitterTest {
public void invoke(Integer value) {
splitterResult1.add(value);
}
+
+ @Override
+ public void cancel() {
+ }
});
d1.split(new OutputSelector<Integer>() {
@@ -98,6 +102,10 @@ public class OutputSplitterTest {
public void invoke(Integer value) {
splitterResult2.add(value);
}
+
+ @Override
+ public void cancel() {
+ }
});
env.execute();
@@ -144,6 +152,10 @@ public class OutputSplitterTest {
public void invoke(Integer value) {
splitterResult1.add(value);
}
+
+ @Override
+ public void cancel() {
+ }
});
ds.split(new OutputSelector<Integer>() {
@@ -168,6 +180,10 @@ public class OutputSplitterTest {
public void invoke(Integer value) {
splitterResult2.add(value);
}
+
+ @Override
+ public void cancel() {
+ }
});
env.execute();
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index dc4932e..e14e281 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -143,6 +143,10 @@ public class WindowCrossJoinTest implements Serializable {
public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
joinResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
}
+
+ @Override
+ public void cancel() {
+ }
}
private static class CrossResultSink implements
@@ -153,5 +157,9 @@ public class WindowCrossJoinTest implements Serializable {
public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
crossResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
}
+
+ @Override
+ public void cancel() {
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 38bba5e..9d166e5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -88,6 +88,10 @@ public class DirectedOutputTest {
outputs.put(name, new ArrayList<Long>());
this.list = outputs.get(name);
}
+
+ @Override
+ public void cancel() {
+ }
}
private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 3163c46..2ed0002 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -208,6 +208,10 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
+ @Override
+ public void cancel() {
+ }
+
}
@SuppressWarnings("serial")
@@ -221,6 +225,10 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
+ @Override
+ public void cancel() {
+ }
+
}
@SuppressWarnings("serial")
@@ -234,6 +242,10 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
+ @Override
+ public void cancel() {
+ }
+
}
@SuppressWarnings("serial")
@@ -247,6 +259,10 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
+ @Override
+ public void cancel() {
+ }
+
}
@SuppressWarnings("serial")
@@ -260,6 +276,10 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
+ @Override
+ public void cancel() {
+ }
+
}
@SuppressWarnings("serial")
@@ -273,6 +293,10 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
+ @Override
+ public void cancel() {
+ }
+
}
@SuppressWarnings("serial")
@@ -286,5 +310,9 @@ public class WindowIntegrationTest implements Serializable {
windows.add(value);
}
+ @Override
+ public void cancel() {
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index 4f01a8b..18a36ac 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -50,12 +50,17 @@ public class StreamVertexTest {
private Tuple1<Integer> tuple = new Tuple1<Integer>(0);
@Override
- public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
+ public void run(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < 10; i++) {
tuple.f0 = i;
collector.collect(tuple);
}
}
+
+ @Override
+ public void cancel() {
+ // No cleanup needed
+ }
}
public static class MyTask extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
@@ -77,6 +82,10 @@ public class StreamVertexTest {
Integer v = tuple.getField(1);
data.put(k, v);
}
+
+ @Override
+ public void cancel() {
+ }
}
@SuppressWarnings("unused")
@@ -142,6 +151,10 @@ public class StreamVertexTest {
public void invoke(String value) {
result.add(value);
}
+
+ @Override
+ public void cancel() {
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
index bb92e8e..4cf02ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
@@ -27,7 +27,7 @@ public class MockSource<T> {
public static <T> List<T> createAndExecute(SourceFunction<T> source) {
List<T> outputs = new ArrayList<T>();
try {
- source.invoke(new MockCollector<T>(outputs));
+ source.run(new MockCollector<T>(outputs));
} catch (Exception e) {
throw new RuntimeException("Cannot invoke source.", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index dcfed50..a5a9577 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -108,7 +108,7 @@ public class WindowJoin {
}
@Override
- public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+ public void run(Collector<Tuple2<String, Integer>> out) throws Exception {
while (true) {
outTuple.f0 = names[rand.nextInt(names.length)];
outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
@@ -116,6 +116,11 @@ public class WindowJoin {
Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
}
}
+
+ @Override
+ public void cancel() {
+ // No cleanup needed
+ }
}
/**
@@ -134,7 +139,7 @@ public class WindowJoin {
}
@Override
- public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+ public void run(Collector<Tuple2<String, Integer>> out) throws Exception {
while (true) {
outTuple.f0 = names[rand.nextInt(names.length)];
outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
@@ -142,6 +147,11 @@ public class WindowJoin {
Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
}
}
+
+ @Override
+ public void cancel() {
+ // No cleanup needed
+ }
}
public static class MyJoinFunction
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 4cdb7c6..26895f2 100755
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -93,7 +93,7 @@ public class IncrementalLearningSkeleton {
private static final int NEW_DATA_SLEEP_TIME = 1000;
@Override
- public void invoke(Collector<Integer> collector) throws Exception {
+ public void run(Collector<Integer> collector) throws Exception {
while (true) {
collector.collect(getNewData());
}
@@ -103,6 +103,11 @@ public class IncrementalLearningSkeleton {
Thread.sleep(NEW_DATA_SLEEP_TIME);
return 1;
}
+
+ @Override
+ public void cancel() {
+ // No cleanup needed
+ }
}
/**
@@ -114,7 +119,7 @@ public class IncrementalLearningSkeleton {
private static final int TRAINING_DATA_SLEEP_TIME = 10;
@Override
- public void invoke(Collector<Integer> collector) throws Exception {
+ public void run(Collector<Integer> collector) throws Exception {
while (true) {
collector.collect(getTrainingData());
}
@@ -126,6 +131,11 @@ public class IncrementalLearningSkeleton {
return 1;
}
+
+ @Override
+ public void cancel() {
+ // No cleanup needed
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
index 9bf851e..ec99026 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
@@ -267,7 +267,7 @@ public class StockPrices {
}
@Override
- public void invoke(Collector<StockPrice> collector) throws Exception {
+ public void run(Collector<StockPrice> collector) throws Exception {
price = DEFAULT_PRICE;
Random random = new Random();
@@ -277,6 +277,11 @@ public class StockPrices {
Thread.sleep(random.nextInt(200));
}
}
+
+ @Override
+ public void cancel() {
+ // No cleanup needed
+ }
}
public final static class WindowMean implements WindowMapFunction<StockPrice, StockPrice> {
@@ -307,7 +312,7 @@ public class StockPrices {
StringBuilder stringBuilder;
@Override
- public void invoke(Collector<String> collector) throws Exception {
+ public void run(Collector<String> collector) throws Exception {
random = new Random();
stringBuilder = new StringBuilder();
@@ -322,6 +327,11 @@ public class StockPrices {
}
}
+
+ @Override
+ public void cancel() {
+ // No cleanup needed
+ }
}
public static final class SendWarning implements WindowMapFunction<StockPrice, String> {
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
index 5e73fd6..311c6b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
@@ -87,7 +87,7 @@ public class TopSpeedWindowingExample {
}
@Override
- public void invoke(Collector<Tuple4<Integer, Integer, Double, Long>> collector)
+ public void run(Collector<Tuple4<Integer, Integer, Double, Long>> collector)
throws Exception {
while (true) {
@@ -104,6 +104,10 @@ public class TopSpeedWindowingExample {
}
}
}
+
+ @Override
+ public void cancel() {
+ }
}
private static int numOfCars = 2;
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3ab4ff1..d4df1d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -568,6 +568,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
val sinkFunction = new SinkFunction[T] {
val cleanFun = clean(fun)
def invoke(in: T) = cleanFun(in)
+ def cancel() = {}
}
this.addSink(sinkFunction)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 00d3704..1212b2b 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -224,9 +224,10 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
Validate.notNull(function, "Function must not be null.")
val sourceFunction = new SourceFunction[T] {
val cleanFun = StreamExecutionEnvironment.clean(function)
- override def invoke(out: Collector[T]) {
+ override def run(out: Collector[T]) {
cleanFun(out)
}
+ override def cancel() = {}
}
addSource(sourceFunction)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index f7f9eae..18b52c5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -18,17 +18,15 @@
package org.apache.flink.test.classloading.jar;
+import java.util.StringTokenizer;
+
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.util.Collector;
-import java.util.StringTokenizer;
-
@SuppressWarnings("serial")
public class StreamingProgram {
@@ -100,5 +98,9 @@ public class StreamingProgram {
@Override
public void invoke(Word value) throws Exception {
}
+
+ @Override
+ public void cancel() {
+ }
}
}