You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/06/04 18:47:48 UTC

[3/3] flink git commit: [FLINK-2098] Improvements on checkpoint-aligned sources

[FLINK-2098] Improvements on checkpoint-aligned sources


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4195ac0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4195ac0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4195ac0

Branch: refs/heads/master
Commit: f4195ac02764b92622316e83fa4dd31b864f2375
Parents: 6685b0b
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jun 2 03:31:43 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jun 4 17:51:39 2015 +0200

----------------------------------------------------------------------
 .../jobgraph/tasks/CheckpointedOperator.java    |  14 ++-
 .../connectors/kafka/KafkaProducerExample.java  |  43 +++-----
 .../connectors/kafka/api/KafkaSource.java       |  37 ++++---
 .../api/persistent/PersistentKafkaSource.java   | 104 ++++++++++---------
 .../api/functions/source/ConnectorSource.java   |   4 +-
 .../source/FileMonitoringFunction.java          |   9 +-
 .../functions/source/FileSourceFunction.java    |   5 +-
 .../functions/source/FromElementsFunction.java  |   4 +-
 .../functions/source/FromIteratorFunction.java  |   5 +-
 .../source/FromSplittableIteratorFunction.java  |   4 +-
 .../source/ParallelSourceFunction.java          |   2 +-
 .../source/RichParallelSourceFunction.java      |  11 +-
 .../source/SocketTextStreamFunction.java        |   4 +-
 .../runtime/tasks/SourceStreamTask.java         |   2 +-
 14 files changed, 132 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
index 84072e1..60f70dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointedOperator.java
@@ -18,11 +18,21 @@
 
 package org.apache.flink.runtime.jobgraph.tasks;
 
+/**
+ * This interface must be implemented by invokable operators (subclasses
+ * of {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable} that
+ * participate in state checkpoints.
+ */
 public interface CheckpointedOperator {
 
 	/**
-	 * This method is either called directly by the checkpoint coordinator, or called
-	 * when all incoming channels have reported a barrier
+	 * This method is either called directly and asynchronously by the checkpoint
+	 * coordinator (in the case of functions that are directly notified - usually
+	 * the data sources), or called synchronously when all incoming channels have
+	 * reported a checkpoint barrier.
+	 * 
+	 * @param checkpointId The ID of the checkpoint, incrementing.
+	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
 	 */
 	void triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index a29e937..4d98b1b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -17,32 +17,35 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 import org.apache.flink.util.Collector;
 
+@SuppressWarnings("serial")
 public class KafkaProducerExample {
 
-	private static String host;
-	private static int port;
-	private static String topic;
-
 	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
+		
+		if (args.length < 3) {
+			System.err.println("Usage: KafkaProducerExample <host> <port> <topic>");
 			return;
 		}
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
+		final String host = args[0];
+		final int port = Integer.parseInt(args[1]);
+		final String topic = args[2];
 
-		@SuppressWarnings({ "unused", "serial" })
-		DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
+		
+		env.addSource(new SourceFunction<String>() {
+			
+			private volatile boolean running = true;
+			
 			@Override
 			public void run(Object checkpointLock, Collector<String> collector) throws Exception {
-				for (int i = 0; i < 20; i++) {
+				for (int i = 0; i < 20 && running; i++) {
 					collector.collect("message #" + i);
 					Thread.sleep(100L);
 				}
@@ -52,26 +55,14 @@ public class KafkaProducerExample {
 
 			@Override
 			public void cancel() {
+				running = false;
 			}
 
 
-		}).addSink(
-				new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema())
-		)
+		})
+			.addSink(new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema()))
 				.setParallelism(3);
 
 		env.execute();
 	}
-
-	private static boolean parseParameters(String[] args) {
-		if (args.length == 3) {
-			host = args[0];
-			port = Integer.parseInt(args[1]);
-			topic = args[2];
-			return true;
-		} else {
-			System.err.println("Usage: KafkaProducerExample <host> <port> <topic>");
-			return false;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 2fa2c26..a4c56e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -38,6 +38,12 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Source that listens to a Kafka topic using the high level Kafka API.
+ * 
+ * <p><b>IMPORTANT:</b> This source is not participating in the checkpointing procedure
+ * and hence gives no form of processing guarantees.
+ * Use the {@link org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource}
+ * for a fault tolerant source that provides "exactly once" processing guarantees when used with
+ * checkpointing enabled.</p>
  *
  * @param <OUT>
  *            Type of the messages on the topic.
@@ -47,20 +53,20 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
+	
+	private static final String DEFAULT_GROUP_ID = "flink-group";
+	private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
 
 	private final String zookeeperAddress;
 	private final String groupId;
 	private final String topicId;
-	private Properties customProperties;
-
+	private final Properties customProperties;
+	private final long zookeeperSyncTimeMillis;
+	
 	private transient ConsumerConnector consumer;
 	private transient ConsumerIterator<byte[], byte[]> consumerIterator;
-
-	private long zookeeperSyncTimeMillis;
-	private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
-	private static final String DEFAULT_GROUP_ID = "flink-group";
-
-	private volatile boolean isRunning = false;
+	
+	private volatile boolean isRunning;
 
 	/**
 	 * Creates a KafkaSource that consumes a topic.
@@ -100,10 +106,12 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	 * 			  Custom properties for Kafka
 	 */
 	public KafkaSource(String zookeeperAddress,
-		String topicId, String groupId,
-		DeserializationSchema<OUT> deserializationSchema,
-		long zookeeperSyncTimeMillis, Properties customProperties) {
+						String topicId, String groupId,
+						DeserializationSchema<OUT> deserializationSchema,
+						long zookeeperSyncTimeMillis, Properties customProperties)
+	{
 		super(deserializationSchema);
+		
 		Preconditions.checkNotNull(zookeeperAddress, "ZK address is null");
 		Preconditions.checkNotNull(topicId, "Topic ID is null");
 		Preconditions.checkNotNull(deserializationSchema, "deserializationSchema is null");
@@ -156,7 +164,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 		props.put("zookeeper.sync.time.ms", Long.toString(zookeeperSyncTimeMillis));
 		props.put("auto.commit.interval.ms", "1000");
 
-		if(customProperties != null) {
+		if (customProperties != null) {
 			for(Map.Entry<Object, Object> e : props.entrySet()) {
 				if(props.contains(e.getKey())) {
 					LOG.warn("Overwriting property "+e.getKey()+" with value "+e.getValue());
@@ -179,7 +187,9 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 
 	@Override
 	public void run(Object checkpointLock, Collector<OUT> collector) throws Exception {
-		isRunning = true;
+		
+		// NOTE: Since this source is not checkpointed, we do not need to
+		// acquire the checkpoint lock
 		try {
 			while (isRunning && consumerIterator.hasNext()) {
 				OUT out = schema.deserialize(consumerIterator.next().message());
@@ -196,6 +206,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	@Override
 	public void open(Configuration config) throws Exception {
 		initializeConnection();
+		isRunning = true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index 84fd7b6..5d77a1a 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.util.Collector;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,8 +74,10 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 	
 	private final String topicName;
 	private final DeserializationSchema<OUT> deserializationSchema;
+
+	private final LinkedMap pendingCheckpoints = new LinkedMap();
 	
-	protected transient ConsumerConfig consumerConfig;
+	private transient ConsumerConfig consumerConfig;
 	private transient ConsumerIterator<byte[], byte[]> iteratorToRead;
 	private transient ConsumerConnector consumer;
 	
@@ -83,11 +86,8 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 	private transient long[] commitedOffsets; // maintain committed offsets, to avoid committing the same over and over again.
 	private transient long[] restoreState;
 	
-	private final LinkedMap pendingCheckpoints = new LinkedMap();
+	private volatile boolean running;
 	
-	// We set this in reachedEnd to carry it over to next()
-	private OUT nextElement = null;
-
 	/**
 	 *
 	 * For the @param consumerConfig, specify at least the "groupid" and "zookeeper.connect" string.
@@ -102,12 +102,12 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 		this.topicName = topicName;
 		this.deserializationSchema = deserializationSchema;
 		this.consumerConfig = consumerConfig;
-		if(consumerConfig.autoCommitEnable()) {
+		if (consumerConfig.autoCommitEnable()) {
 			throw new IllegalArgumentException("'auto.commit.enable' is set to 'true'. " +
 					"This source can only be used with auto commit disabled because the " +
 					"source is committing to zookeeper by itself (not using the KafkaConsumer).");
 		}
-		if(!consumerConfig.offsetsStorage().equals("zookeeper")) {
+		if (!consumerConfig.offsetsStorage().equals("zookeeper")) {
 			// we can currently only commit to ZK.
 			throw new IllegalArgumentException("The 'offsets.storage' has to be set to 'zookeeper' for this Source to work reliably");
 		}
@@ -164,49 +164,44 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 		}
 		Arrays.fill(this.commitedOffsets, 0); // just to make it clear
 		
-		nextElement = null;
 		pendingCheckpoints.clear();
+		running = true;
 	}
 
 	@Override
-	public boolean reachedEnd() throws Exception {
-		if (nextElement != null) {
-			return false;
+	public void run(Object checkpointLock, Collector<OUT> collector) throws Exception {
+		if (iteratorToRead == null) {
+			throw new IllegalStateException("Kafka iterator not initialized properly.");
 		}
-
-		while (iteratorToRead.hasNext()) {
+		
+		while (running && iteratorToRead.hasNext()) {
 			MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next();
 			if(lastOffsets[message.partition()] >= message.offset()) {
 				LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition());
 				continue;
 			}
-			lastOffsets[message.partition()] = message.offset();
+			OUT next = deserializationSchema.deserialize(message.message());
 
-			OUT out = deserializationSchema.deserialize(message.message());
-			if (deserializationSchema.isEndOfStream(out)) {
+			if (deserializationSchema.isEndOfStream(next)) {
 				LOG.info("DeserializationSchema signaled end of stream for this source");
 				break;
 			}
 
-			nextElement = out;
+			// make the state update and the element emission atomic
+			synchronized (checkpointLock) {
+				lastOffsets[message.partition()] = message.offset();
+				collector.collect(next);
+			}
+
 			if (LOG.isTraceEnabled()) {
 				LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition());
 			}
-			break;
 		}
-
-		return nextElement == null;
 	}
 
 	@Override
-	public OUT next() throws Exception {
-		if (!reachedEnd()) {
-			OUT result = nextElement;
-			nextElement = null;
-			return result;
-		} else {
-			throw new RuntimeException("Source exhausted");
-		}
+	public void cancel() {
+		running = false;
 	}
 
 	@Override
@@ -222,15 +217,23 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 
 	@Override
 	public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-
-		if(lastOffsets == null) {
+		if (lastOffsets == null) {
 			LOG.warn("State snapshot requested on not yet opened source. Returning null");
 			return null;
 		}
-		LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}", Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
+		
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}",
+					Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
+		}
 
 		long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
-		pendingCheckpoints.put(checkpointId, currentOffsets);
+		
+		// the map may be asynchronously updates when committing to Kafka, so we synchronize
+		synchronized (pendingCheckpoints) {
+			pendingCheckpoints.put(checkpointId, currentOffsets);
+		}
+		
 		return currentOffsets;
 	}
 
@@ -239,8 +242,7 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 		LOG.info("The state will be restored to {} in the open() method", Arrays.toString(state));
 		this.restoreState = Arrays.copyOf(state, state.length);
 	}
-
-
+	
 	/**
 	 * Notification on completed checkpoints
 	 * @param checkpointId The ID of the checkpoint that has been completed.
@@ -248,27 +250,35 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 	@Override
 	public void commitCheckpoint(long checkpointId) {
 		LOG.info("Commit checkpoint {}", checkpointId);
-		final int posInMap = pendingCheckpoints.indexOf(checkpointId);
-		if(posInMap == -1) {
-			LOG.warn("Unable to find pending checkpoint for id {}", checkpointId);
-			return;
-		}
 
-		long[] checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
-		LOG.info("Committing offsets {} to ZooKeeper", Arrays.toString(checkpointOffsets));
-
-		// remove older checkpoints in map:
-		if(!pendingCheckpoints.isEmpty()) {
-			for(int i = 0; i < posInMap; i++) {
-				pendingCheckpoints.remove(0);
+		long[] checkpointOffsets;
+		
+		// the map may be asynchronously updates when snapshotting state, so we synchronize
+		synchronized (pendingCheckpoints) {
+			final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+			if (posInMap == -1) {
+				LOG.warn("Unable to find pending checkpoint for id {}", checkpointId);
+				return;
 			}
+	
+			checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
+			// remove older checkpoints in map:
+			if (!pendingCheckpoints.isEmpty()) {
+				for(int i = 0; i < posInMap; i++) {
+					pendingCheckpoints.remove(0);
+				}
+			} 
+		}
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Committing offsets {} to ZooKeeper", Arrays.toString(checkpointOffsets));
 		}
 
 		setOffsetsInZooKeeper(checkpointOffsets);
 	}
 
 	private void setOffsetsInZooKeeper(long[] offsets) {
-		for(int partition = 0; partition < offsets.length; partition++) {
+		for (int partition = 0; partition < offsets.length; partition++) {
 			long offset = offsets[partition];
 			if(offset != -1) {
 				setOffset(partition, offset);

http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
index 9f59dce..0d107f6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ConnectorSource.java
@@ -19,12 +19,12 @@ package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
-public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT>{
+public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT> {
 
 	private static final long serialVersionUID = 1L;
+	
 	protected DeserializationSchema<OUT> schema;
 
 	public ConnectorSource(DeserializationSchema<OUT> schema) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
index ecbcee5..a9166c1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -52,7 +52,7 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 	private Map<String, Long> offsetOfFiles;
 	private Map<String, Long> modificationTimes;
 
-	private volatile boolean isRunning;
+	private volatile boolean isRunning = true;
 
 	public FileMonitoringFunction(String path, long interval, WatchType watchType) {
 		this.path = path;
@@ -64,7 +64,6 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 
 	@Override
 	public void run(Object checkpointLock, Collector<Tuple3<String, Long, Long>> collector) throws Exception {
-		isRunning = true;
 		FileSystem fileSystem = FileSystem.get(new URI(path));
 
 		while (isRunning) {
@@ -117,11 +116,7 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 			return true;
 		} else {
 			Long lastModification = modificationTimes.get(fileName);
-			if (lastModification == null) {
-				return false;
-			} else {
-				return lastModification >= modificationTime;
-			}
+			return lastModification != null && lastModification >= modificationTime;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
index 19ee7e1..ebbff9c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
@@ -40,7 +40,7 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
 	private transient InputSplitProvider provider;
 	private transient Iterator<InputSplit> splitIterator;
 
-	private volatile boolean isRunning;
+	private volatile boolean isRunning = true;
 
 	@SuppressWarnings("unchecked")
 	public FileSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
@@ -65,7 +65,6 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
 
 	@Override
 	public void close() throws Exception {
-		super.close();
 		format.close();
 	}
 
@@ -118,8 +117,6 @@ public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
 
 	@Override
 	public void run(Object checkpointLock, Collector<OUT> out) throws Exception {
-		isRunning = true;
-
 		while (isRunning) {
 			OUT nextElement = serializer.createInstance();
 			nextElement =  format.nextRecord(nextElement);

http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 0b323bc..838cfa4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -24,11 +24,12 @@ import java.util.Iterator;
 import org.apache.flink.util.Collector;
 
 public class FromElementsFunction<T> implements SourceFunction<T> {
+	
 	private static final long serialVersionUID = 1L;
 
 	private Iterable<T> iterable;
 
-	private volatile boolean isRunning;
+	private volatile boolean isRunning = true;
 
 	public FromElementsFunction(T... elements) {
 		this.iterable = Arrays.asList(elements);
@@ -44,7 +45,6 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
 
 	@Override
 	public void run(Object checkpointLock, Collector<T> out) throws Exception {
-		isRunning = true;
 		Iterator<T> it = iterable.iterator();
 
 		while (isRunning && it.hasNext()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
index 4ee1334..7320d77 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
@@ -25,9 +25,9 @@ public class FromIteratorFunction<T> implements SourceFunction<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	Iterator<T> iterator;
+	private final Iterator<T> iterator;
 
-	private volatile boolean isRunning;
+	private volatile boolean isRunning = true;
 
 	public FromIteratorFunction(Iterator<T> iterator) {
 		this.iterator = iterator;
@@ -35,7 +35,6 @@ public class FromIteratorFunction<T> implements SourceFunction<T> {
 
 	@Override
 	public void run(Object checkpointLock, Collector<T> out) throws Exception {
-		isRunning = true;
 		while (isRunning && iterator.hasNext()) {
 			out.collect(iterator.next());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
index 97f5c06..61e1b7f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
@@ -31,7 +31,7 @@ public class FromSplittableIteratorFunction<T> extends RichParallelSourceFunctio
 
 	private transient Iterator<T> iterator;
 
-	private volatile boolean isRunning;
+	private volatile boolean isRunning = true;
 
 	public FromSplittableIteratorFunction(SplittableIterator<T> iterator) {
 		this.fullIterator = iterator;
@@ -47,8 +47,6 @@ public class FromSplittableIteratorFunction<T> extends RichParallelSourceFunctio
 
 	@Override
 	public void run(Object checkpointLock, Collector<T> out) throws Exception {
-		isRunning = true;
-
 		while (isRunning && iterator.hasNext()) {
 			out.collect(iterator.next());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
index c39a372..e9a739f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
@@ -25,7 +25,7 @@ package org.apache.flink.streaming.api.functions.source;
  * <p>This interface acts only as a marker to tell the system that this source may
  * be executed in parallel. When different parallel instances are required to perform
  * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime
- * context, which revels information like the number of parallel tasks, and which parallel
+ * context, which reveals information like the number of parallel tasks, and which parallel
  * task the current instance is.
  *
  * @param <OUT> The type of the records produced by this source.

http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
index fcbcbce..7cbf674 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
@@ -20,9 +20,14 @@ package org.apache.flink.streaming.api.functions.source;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 
 /**
- * Base class for implementing a data source that has access to context information
- * (via {@link #getRuntimeContext()}) and additional life-cycle methods
- * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
+ * Base class for implementing a parallel data source. Upon execution, the runtime will
+ * execute as many parallel instances of this function function as configured parallelism
+ * of the source.
+ * 
+ * <p>The data source has access to context information (such as the number of parallel
+ * instances of the source, and which parallel instance the current instance is)
+ * via {@link #getRuntimeContext()}. It also provides additional life-cycle methods
+ * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.</p>
  *
  * @param <OUT> The type of the records produced by this source.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
index 8e5c3b3..c5800f0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
@@ -45,7 +45,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 	private static final int CONNECTION_TIMEOUT_TIME = 0;
 	private static final int CONNECTION_RETRY_SLEEP = 1000;
 
-	private volatile boolean isRunning = false;
+	private volatile boolean isRunning;
 
 	public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
 		this.hostname = hostname;
@@ -60,6 +60,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 		super.open(parameters);
 		socket = new Socket();
 		socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
+		isRunning = true;
 	}
 
 	@Override
@@ -68,7 +69,6 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 	}
 
 	public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception {
-		isRunning = true;
 		try {
 			StringBuffer buffer = new StringBuffer();
 			BufferedReader reader = new BufferedReader(new InputStreamReader(

http://git-wip-us.apache.org/repos/asf/flink/blob/f4195ac0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 186fb3f..35b5341 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -86,7 +86,7 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
 
 	@Override
 	public void cancel() {
-		streamOperator.cancel();
 		super.cancel();
+		streamOperator.cancel();
 	}
 }