You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/06/04 18:47:48 UTC
[3/3] flink git commit: [FLINK-2098] Improvements on
checkpoint-aligned sources
[FLINK-2098] Improvements on checkpoint-aligned sources
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4195ac0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4195ac0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4195ac0
Branch: refs/heads/master
Commit: f4195ac02764b92622316e83fa4dd31b864f2375
Parents: 6685b0b
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jun 2 03:31:43 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 4 17:51:39 2015 +0200
----------------------------------------------------------------------
.../jobgraph/tasks/CheckpointedOperator.java | 14 ++-
.../connectors/kafka/KafkaProducerExample.java | 43 +++-----
.../connectors/kafka/api/KafkaSource.java | 37 ++++---
.../api/persistent/PersistentKafkaSource.java | 104 ++++++++++---------
.../api/functions/source/ConnectorSource.java | 4 +-
.../source/FileMonitoringFunction.java | 9 +-
.../functions/source/FileSourceFunction.java | 5 +-
.../functions/source/FromElementsFunction.java | 4 +-
.../functions/source/FromIteratorFunction.java | 5 +-
.../source/FromSplittableIteratorFunction.java | 4 +-
.../source/ParallelSourceFunction.java | 2 +-
.../source/RichParallelSourceFunction.java | 11 +-
.../source/SocketTextStreamFunction.java | 4 +-
.../runtime/tasks/SourceStreamTask.java | 2 +-
14 files changed, 132 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
index 84072e1..60f70dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
@@ -18,11 +18,21 @@
package org.apache.flink.runtime.jobgraph.tasks;
+/**
+ * This interface must be implemented by invokable operators (subclasses
+ * of {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable} that
+ * participate in state checkpoints.
+ */
public interface CheckpointedOperator {
/**
- * This method is either called directly by the checkpoint coordinator, or called
- * when all incoming channels have reported a barrier
+ * This method is either called directly and asynchronously by the checkpoint
+ * coordinator (in the case of functions that are directly notified - usually
+ * the data sources), or called synchronously when all incoming channels have
+ * reported a checkpoint barrier.
+ *
+ * @param checkpointId The ID of the checkpoint, incrementing.
+ * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
*/
void triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index a29e937..4d98b1b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -17,32 +17,35 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
import org.apache.flink.util.Collector;
+@SuppressWarnings("serial")
public class KafkaProducerExample {
- private static String host;
- private static int port;
- private static String topic;
-
public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
+
+ if (args.length < 3) {
+ System.err.println("Usage: KafkaProducerExample <host> <port> <topic>");
return;
}
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
+ final String host = args[0];
+ final int port = Integer.parseInt(args[1]);
+ final String topic = args[2];
- @SuppressWarnings({ "unused", "serial" })
- DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
+
+ env.addSource(new SourceFunction<String>() {
+
+ private volatile boolean running = true;
+
@Override
public void run(Object checkpointLock, Collector<String> collector) throws Exception {
- for (int i = 0; i < 20; i++) {
+ for (int i = 0; i < 20 && running; i++) {
collector.collect("message #" + i);
Thread.sleep(100L);
}
@@ -52,26 +55,14 @@ public class KafkaProducerExample {
@Override
public void cancel() {
+ running = false;
}
- }).addSink(
- new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema())
- )
+ })
+ .addSink(new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema()))
.setParallelism(3);
env.execute();
}
-
- private static boolean parseParameters(String[] args) {
- if (args.length == 3) {
- host = args[0];
- port = Integer.parseInt(args[1]);
- topic = args[2];
- return true;
- } else {
- System.err.println("Usage: KafkaProducerExample <host> <port> <topic>");
- return false;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 2fa2c26..a4c56e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -38,6 +38,12 @@ import org.slf4j.LoggerFactory;
/**
* Source that listens to a Kafka topic using the high level Kafka API.
+ *
+ * <p><b>IMPORTANT:</b> This source is not participating in the checkpointing procedure
+ * and hence gives no form of processing guarantees.
+ * Use the {@link org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource}
+ * for a fault tolerant source that provides "exactly once" processing guarantees when used with
+ * checkpointing enabled.</p>
*
* @param <OUT>
* Type of the messages on the topic.
@@ -47,20 +53,20 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
+
+ private static final String DEFAULT_GROUP_ID = "flink-group";
+ private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
private final String zookeeperAddress;
private final String groupId;
private final String topicId;
- private Properties customProperties;
-
+ private final Properties customProperties;
+ private final long zookeeperSyncTimeMillis;
+
private transient ConsumerConnector consumer;
private transient ConsumerIterator<byte[], byte[]> consumerIterator;
-
- private long zookeeperSyncTimeMillis;
- private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
- private static final String DEFAULT_GROUP_ID = "flink-group";
-
- private volatile boolean isRunning = false;
+
+ private volatile boolean isRunning;
/**
* Creates a KafkaSource that consumes a topic.
@@ -100,10 +106,12 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
* Custom properties for Kafka
*/
public KafkaSource(String zookeeperAddress,
- String topicId, String groupId,
- DeserializationSchema<OUT> deserializationSchema,
- long zookeeperSyncTimeMillis, Properties customProperties) {
+ String topicId, String groupId,
+ DeserializationSchema<OUT> deserializationSchema,
+ long zookeeperSyncTimeMillis, Properties customProperties)
+ {
super(deserializationSchema);
+
Preconditions.checkNotNull(zookeeperAddress, "ZK address is null");
Preconditions.checkNotNull(topicId, "Topic ID is null");
Preconditions.checkNotNull(deserializationSchema, "deserializationSchema is null");
@@ -156,7 +164,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
props.put("zookeeper.sync.time.ms", Long.toString(zookeeperSyncTimeMillis));
props.put("auto.commit.interval.ms", "1000");
- if(customProperties != null) {
+ if (customProperties != null) {
for(Map.Entry<Object, Object> e : props.entrySet()) {
if(props.contains(e.getKey())) {
LOG.warn("Overwriting property "+e.getKey()+" with value "+e.getValue());
@@ -179,7 +187,9 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
@Override
public void run(Object checkpointLock, Collector<OUT> collector) throws Exception {
- isRunning = true;
+
+ // NOTE: Since this source is not checkpointed, we do not need to
+ // acquire the checkpoint lock
try {
while (isRunning && consumerIterator.hasNext()) {
OUT out = schema.deserialize(consumerIterator.next().message());
@@ -196,6 +206,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
@Override
public void open(Configuration config) throws Exception {
initializeConnection();
+ isRunning = true;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index 84fd7b6..5d77a1a 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.Collector;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,8 +74,10 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
private final String topicName;
private final DeserializationSchema<OUT> deserializationSchema;
+
+ private final LinkedMap pendingCheckpoints = new LinkedMap();
- protected transient ConsumerConfig consumerConfig;
+ private transient ConsumerConfig consumerConfig;
private transient ConsumerIterator<byte[], byte[]> iteratorToRead;
private transient ConsumerConnector consumer;
@@ -83,11 +86,8 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
private transient long[] commitedOffsets; // maintain committed offsets, to avoid committing the same over and over again.
private transient long[] restoreState;
- private final LinkedMap pendingCheckpoints = new LinkedMap();
+ private volatile boolean running;
- // We set this in reachedEnd to carry it over to next()
- private OUT nextElement = null;
-
/**
*
* For the @param consumerConfig, specify at least the "groupid" and "zookeeper.connect" string.
@@ -102,12 +102,12 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
this.topicName = topicName;
this.deserializationSchema = deserializationSchema;
this.consumerConfig = consumerConfig;
- if(consumerConfig.autoCommitEnable()) {
+ if (consumerConfig.autoCommitEnable()) {
throw new IllegalArgumentException("'auto.commit.enable' is set to 'true'. " +
"This source can only be used with auto commit disabled because the " +
"source is committing to zookeeper by itself (not using the KafkaConsumer).");
}
- if(!consumerConfig.offsetsStorage().equals("zookeeper")) {
+ if (!consumerConfig.offsetsStorage().equals("zookeeper")) {
// we can currently only commit to ZK.
throw new IllegalArgumentException("The 'offsets.storage' has to be set to 'zookeeper' for this Source to work reliably");
}
@@ -164,49 +164,44 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
}
Arrays.fill(this.commitedOffsets, 0); // just to make it clear
- nextElement = null;
pendingCheckpoints.clear();
+ running = true;
}
@Override
- public boolean reachedEnd() throws Exception {
- if (nextElement != null) {
- return false;
+ public void run(Object checkpointLock, Collector<OUT> collector) throws Exception {
+ if (iteratorToRead == null) {
+ throw new IllegalStateException("Kafka iterator not initialized properly.");
}
-
- while (iteratorToRead.hasNext()) {
+
+ while (running && iteratorToRead.hasNext()) {
MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next();
if(lastOffsets[message.partition()] >= message.offset()) {
LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition());
continue;
}
- lastOffsets[message.partition()] = message.offset();
+ OUT next = deserializationSchema.deserialize(message.message());
- OUT out = deserializationSchema.deserialize(message.message());
- if (deserializationSchema.isEndOfStream(out)) {
+ if (deserializationSchema.isEndOfStream(next)) {
LOG.info("DeserializationSchema signaled end of stream for this source");
break;
}
- nextElement = out;
+ // make the state update and the element emission atomic
+ synchronized (checkpointLock) {
+ lastOffsets[message.partition()] = message.offset();
+ collector.collect(next);
+ }
+
if (LOG.isTraceEnabled()) {
LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition());
}
- break;
}
-
- return nextElement == null;
}
@Override
- public OUT next() throws Exception {
- if (!reachedEnd()) {
- OUT result = nextElement;
- nextElement = null;
- return result;
- } else {
- throw new RuntimeException("Source exhausted");
- }
+ public void cancel() {
+ running = false;
}
@Override
@@ -222,15 +217,23 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
@Override
public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-
- if(lastOffsets == null) {
+ if (lastOffsets == null) {
LOG.warn("State snapshot requested on not yet opened source. Returning null");
return null;
}
- LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}", Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}",
+ Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
+ }
long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
- pendingCheckpoints.put(checkpointId, currentOffsets);
+
+ // the map may be asynchronously updates when committing to Kafka, so we synchronize
+ synchronized (pendingCheckpoints) {
+ pendingCheckpoints.put(checkpointId, currentOffsets);
+ }
+
return currentOffsets;
}
@@ -239,8 +242,7 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
LOG.info("The state will be restored to {} in the open() method", Arrays.toString(state));
this.restoreState = Arrays.copyOf(state, state.length);
}
-
-
+
/**
* Notification on completed checkpoints
* @param checkpointId The ID of the checkpoint that has been completed.
@@ -248,27 +250,35 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
@Override
public void commitCheckpoint(long checkpointId) {
LOG.info("Commit checkpoint {}", checkpointId);
- final int posInMap = pendingCheckpoints.indexOf(checkpointId);
- if(posInMap == -1) {
- LOG.warn("Unable to find pending checkpoint for id {}", checkpointId);
- return;
- }
- long[] checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
- LOG.info("Committing offsets {} to ZooKeeper", Arrays.toString(checkpointOffsets));
-
- // remove older checkpoints in map:
- if(!pendingCheckpoints.isEmpty()) {
- for(int i = 0; i < posInMap; i++) {
- pendingCheckpoints.remove(0);
+ long[] checkpointOffsets;
+
+ // the map may be asynchronously updates when snapshotting state, so we synchronize
+ synchronized (pendingCheckpoints) {
+ final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+ if (posInMap == -1) {
+ LOG.warn("Unable to find pending checkpoint for id {}", checkpointId);
+ return;
}
+
+ checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
+ // remove older checkpoints in map:
+ if (!pendingCheckpoints.isEmpty()) {
+ for(int i = 0; i < posInMap; i++) {
+ pendingCheckpoints.remove(0);
+ }
+ }
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Committing offsets {} to ZooKeeper", Arrays.toString(checkpointOffsets));
}
setOffsetsInZooKeeper(checkpointOffsets);
}
private void setOffsetsInZooKeeper(long[] offsets) {
- for(int partition = 0; partition < offsets.length; partition++) {
+ for (int partition = 0; partition < offsets.length; partition++) {
long offset = offsets[partition];
if(offset != -1) {
setOffset(partition, offset);
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
index 9f59dce..0d107f6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
@@ -19,12 +19,12 @@ package org.apache.flink.streaming.api.functions.source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT>{
+public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT> {
private static final long serialVersionUID = 1L;
+
protected DeserializationSchema<OUT> schema;
public ConnectorSource(DeserializationSchema<OUT> schema) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
index ecbcee5..a9166c1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -52,7 +52,7 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
private Map<String, Long> offsetOfFiles;
private Map<String, Long> modificationTimes;
- private volatile boolean isRunning;
+ private volatile boolean isRunning = true;
public FileMonitoringFunction(String path, long interval, WatchType watchType) {
this.path = path;
@@ -64,7 +64,6 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
@Override
public void run(Object checkpointLock, Collector<Tuple3<String, Long, Long>> collector) throws Exception {
- isRunning = true;
FileSystem fileSystem = FileSystem.get(new URI(path));
while (isRunning) {
@@ -117,11 +116,7 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
return true;
} else {
Long lastModification = modificationTimes.get(fileName);
- if (lastModification == null) {
- return false;
- } else {
- return lastModification >= modificationTime;
- }
+ return lastModification != null && lastModification >= modificationTime;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
index 19ee7e1..ebbff9c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
@@ -40,7 +40,7 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
private transient InputSplitProvider provider;
private transient Iterator<InputSplit> splitIterator;
- private volatile boolean isRunning;
+ private volatile boolean isRunning = true;
@SuppressWarnings("unchecked")
public FileSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
@@ -65,7 +65,6 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
@Override
public void close() throws Exception {
- super.close();
format.close();
}
@@ -118,8 +117,6 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
@Override
public void run(Object checkpointLock, Collector<OUT> out) throws Exception {
- isRunning = true;
-
while (isRunning) {
OUT nextElement = serializer.createInstance();
nextElement = format.nextRecord(nextElement);
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 0b323bc..838cfa4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -24,11 +24,12 @@ import java.util.Iterator;
import org.apache.flink.util.Collector;
public class FromElementsFunction<T> implements SourceFunction<T> {
+
private static final long serialVersionUID = 1L;
private Iterable<T> iterable;
- private volatile boolean isRunning;
+ private volatile boolean isRunning = true;
public FromElementsFunction(T... elements) {
this.iterable = Arrays.asList(elements);
@@ -44,7 +45,6 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
@Override
public void run(Object checkpointLock, Collector<T> out) throws Exception {
- isRunning = true;
Iterator<T> it = iterable.iterator();
while (isRunning && it.hasNext()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
index 4ee1334..7320d77 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
@@ -25,9 +25,9 @@ public class FromIteratorFunction<T> implements SourceFunction<T> {
private static final long serialVersionUID = 1L;
- Iterator<T> iterator;
+ private final Iterator<T> iterator;
- private volatile boolean isRunning;
+ private volatile boolean isRunning = true;
public FromIteratorFunction(Iterator<T> iterator) {
this.iterator = iterator;
@@ -35,7 +35,6 @@ public class FromIteratorFunction<T> implements SourceFunction<T> {
@Override
public void run(Object checkpointLock, Collector<T> out) throws Exception {
- isRunning = true;
while (isRunning && iterator.hasNext()) {
out.collect(iterator.next());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
index 97f5c06..61e1b7f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
@@ -31,7 +31,7 @@ public class FromSplittableIteratorFunction<T> extends RichParallelSourceFunctio
private transient Iterator<T> iterator;
- private volatile boolean isRunning;
+ private volatile boolean isRunning = true;
public FromSplittableIteratorFunction(SplittableIterator<T> iterator) {
this.fullIterator = iterator;
@@ -47,8 +47,6 @@ public class FromSplittableIteratorFunction<T> extends RichParallelSourceFunctio
@Override
public void run(Object checkpointLock, Collector<T> out) throws Exception {
- isRunning = true;
-
while (isRunning && iterator.hasNext()) {
out.collect(iterator.next());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
index c39a372..e9a739f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
@@ -25,7 +25,7 @@ package org.apache.flink.streaming.api.functions.source;
* <p>This interface acts only as a marker to tell the system that this source may
* be executed in parallel. When different parallel instances are required to perform
* different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime
- * context, which revels information like the number of parallel tasks, and which parallel
+ * context, which reveals information like the number of parallel tasks, and which parallel
* task the current instance is.
*
* @param <OUT> The type of the records produced by this source.
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
index fcbcbce..7cbf674 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
@@ -20,9 +20,14 @@ package org.apache.flink.streaming.api.functions.source;
import org.apache.flink.api.common.functions.AbstractRichFunction;
/**
- * Base class for implementing a data source that has access to context information
- * (via {@link #getRuntimeContext()}) and additional life-cycle methods
- * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
+ * Base class for implementing a parallel data source. Upon execution, the runtime will
+ * execute as many parallel instances of this function function as configured parallelism
+ * of the source.
+ *
+ * <p>The data source has access to context information (such as the number of parallel
+ * instances of the source, and which parallel instance the current instance is)
+ * via {@link #getRuntimeContext()}. It also provides additional life-cycle methods
+ * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.</p>
*
* @param <OUT> The type of the records produced by this source.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
index 8e5c3b3..c5800f0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
@@ -45,7 +45,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
private static final int CONNECTION_TIMEOUT_TIME = 0;
private static final int CONNECTION_RETRY_SLEEP = 1000;
- private volatile boolean isRunning = false;
+ private volatile boolean isRunning;
public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
this.hostname = hostname;
@@ -60,6 +60,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
super.open(parameters);
socket = new Socket();
socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
+ isRunning = true;
}
@Override
@@ -68,7 +69,6 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
}
public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception {
- isRunning = true;
try {
StringBuffer buffer = new StringBuffer();
BufferedReader reader = new BufferedReader(new InputStreamReader(
http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 186fb3f..35b5341 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -86,7 +86,7 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
@Override
public void cancel() {
- streamOperator.cancel();
super.cancel();
+ streamOperator.cancel();
}
}