You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/14 13:26:09 UTC

[1/4] flink git commit: [FLINK-2509] [hotfix] Address pull request comments for ClassLoaderUtils

Repository: flink
Updated Branches:
  refs/heads/master 26c6447e9 -> 51872d73b


[FLINK-2509] [hotfix] Address pull request comments for ClassLoaderUtils

Correcting commit, because the wrong commit was pushed earlier.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cc8d66f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cc8d66f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cc8d66f

Branch: refs/heads/master
Commit: 4cc8d66fbf7d288581d1912527f54c3d0f1943d7
Parents: 26c6447
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 13 17:00:18 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 14 12:19:29 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java | 3 ---
 .../java/org/apache/flink/streaming/api/graph/StreamConfig.java  | 4 ++--
 2 files changed, 2 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4cc8d66f/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
index d5f3c9e..654001d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ClassLoaderUtilsTest.java
@@ -85,8 +85,6 @@ public class ClassLoaderUtilsTest {
 			assertTrue(info.indexOf(validJar.getAbsolutePath() + "' (valid") > 0);
 			assertTrue(info.indexOf(invalidJar.getAbsolutePath() + "' (invalid JAR") > 0);
 			assertTrue(info.indexOf(nonExisting.getAbsolutePath() + "' (missing") > 0);
-
-			System.out.println(info);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -107,7 +105,6 @@ public class ClassLoaderUtilsTest {
 	@Test
 	public void testWithAppClassLoader() {
 		try {
-			// must return something when invoked with 'null'
 			String result = ClassLoaderUtil.getUserCodeClassLoaderInfo(ClassLoader.getSystemClassLoader());
 			assertTrue(result.toLowerCase().contains("system classloader"));
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cc8d66f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index a8486d3..4f19db6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -213,8 +213,8 @@ public class StreamConfig implements Serializable {
 			String exceptionMessage = "Cannot load user class: " + e.getMessage()
 					+ "\nClassLoader info: " + classLoaderInfo + 
 					(loadableDoubleCheck ? 
-							"Class was actually found in classloader - deserialization issue." :
-							"Class not resolveable through given classloader.");
+							"\nClass was actually found in classloader - deserialization issue." :
+							"\nClass not resolvable through given classloader.");
 			
 			throw new StreamTaskException(exceptionMessage);
 		}


[3/4] flink git commit: [FLINK-2519] [streaming] Make checkpoint alignment aware of finished partitions.

Posted by se...@apache.org.
[FLINK-2519] [streaming] Make checkpoint alignment aware of finished partitions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1d1bd0a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1d1bd0a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1d1bd0a

Branch: refs/heads/master
Commit: e1d1bd0a224b32f6a488e400f5f07e4ab4b65869
Parents: 06e2da3
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 14 12:16:26 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 14 12:34:50 2015 +0200

----------------------------------------------------------------------
 .../network/partition/consumer/InputGate.java   |  40 +++++
 .../streaming/runtime/io/BarrierBuffer.java     |  70 +++++---
 .../streaming/runtime/io/BarrierBufferTest.java | 169 ++++++++++++-------
 .../runtime/io/BarrierTrackerTest.java          |  63 ++-----
 .../streaming/runtime/io/MockInputGate.java     |  94 +++++++++++
 5 files changed, 294 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1d1bd0a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 1f42cfa..f18c7e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -23,6 +23,46 @@ import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;
 
+/**
+ * An input gate consumes one or more partitions of a single produced intermediate result.
+ *
+ * <p> Each intermediate result is partitioned over its producing parallel subtasks; each of these
+ * partitions is furthermore partitioned into one or more subpartitions.
+ *
+ * <p> As an example, consider a map-reduce program, where the map operator produces data and the
+ * reduce operator consumes the produced data.
+ *
+ * <pre>
+ * +-----+              +---------------------+              +--------+
+ * | Map | = produce => | Intermediate Result | <= consume = | Reduce |
+ * +-----+              +---------------------+              +--------+
+ * </pre>
+ *
+ * <p> When deploying such a program in parallel, the intermediate result will be partitioned over its
+ * producing parallel subtasks; each of these partitions is furthermore partitioned into one or more
+ * subpartitions.
+ *
+ * <pre>
+ *                            Intermediate result
+ *               +-----------------------------------------+
+ *               |                      +----------------+ |              +-----------------------+
+ * +-------+     | +-------------+  +=> | Subpartition 1 | | <=======+=== | Input Gate | Reduce 1 |
+ * | Map 1 | ==> | | Partition 1 | =|   +----------------+ |         |    +-----------------------+
+ * +-------+     | +-------------+  +=> | Subpartition 2 | | <==+    |
+ *               |                      +----------------+ |    |    | Subpartition request
+ *               |                                         |    |    |
+ *               |                      +----------------+ |    |    |
+ * +-------+     | +-------------+  +=> | Subpartition 1 | | <==+====+
+ * | Map 2 | ==> | | Partition 2 | =|   +----------------+ |    |         +-----------------------+
+ * +-------+     | +-------------+  +=> | Subpartition 2 | | <==+======== | Input Gate | Reduce 2 |
+ *               |                      +----------------+ |              +-----------------------+
+ *               +-----------------------------------------+
+ * </pre>
+ *
+ * <p> In the above example, two map subtasks produce the intermediate result in parallel, resulting
+ * in two partitions (Partition 1 and 2). Each of these partitions is further partitioned into two
+ * subpartitions -- one for each parallel reduce subtask.
+ */
 public interface InputGate {
 
 	int getNumberOfInputChannels();

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d1bd0a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index fd896c9..0bcdb74 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
@@ -68,7 +69,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private long currentCheckpointId = -1L;
 
 	/** The number of received barriers (= number of blocked/buffered channels) */
-	private long numReceivedBarriers;
+	private int numBarriersReceived;
+	
+	/** The number of already closed channels */
+	private int numClosedChannels;
 	
 	/** Flag to indicate whether we have drawn all available input */
 	private boolean endOfStream;
@@ -99,32 +103,38 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		while (true) {
 			// process buffered BufferOrEvents before grabbing new ones
 			BufferOrEvent next;
-			if (currentBuffered != null) {
+			if (currentBuffered == null) {
+				next = inputGate.getNextBufferOrEvent();
+			}
+			else {
 				next = currentBuffered.getNext();
 				if (next == null) {
-					currentBuffered.cleanup();
-					currentBuffered = queuedBuffered.pollFirst();
-					if (currentBuffered != null) {
-						currentBuffered.open();
-					}
+					completeBufferedSequence();
 					return getNextNonBlocked();
 				}
 			}
-			else {
-				next = inputGate.getNextBufferOrEvent();
-			}
 			
 			if (next != null) {
 				if (isBlocked(next.getChannelIndex())) {
 					// if the channel is blocked we, we just store the BufferOrEvent
 					bufferSpiller.add(next);
 				}
-				else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
+				else if (next.isBuffer()) {
 					return next;
 				}
-				else if (!endOfStream) {
-					// process barriers only if there is a chance of the checkpoint completing
-					processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
+				else if (next.getEvent().getClass() == CheckpointBarrier.class) {
+					if (!endOfStream) {
+						// process barriers only if there is a chance of the checkpoint completing
+						processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
+					}
+				}
+				else {
+					if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
+						numClosedChannels++;
+						// no chance to complete this checkpoint
+						releaseBlocks();
+					}
+					return next;
 				}
 			}
 			else if (!endOfStream) {
@@ -139,10 +149,18 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		}
 	}
 	
+	private void completeBufferedSequence() throws IOException {
+		currentBuffered.cleanup();
+		currentBuffered = queuedBuffered.pollFirst();
+		if (currentBuffered != null) {
+			currentBuffered.open();
+		}
+	}
+	
 	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
 		final long barrierId = receivedBarrier.getId();
 
-		if (numReceivedBarriers > 0) {
+		if (numBarriersReceived > 0) {
 			// subsequent barrier of a checkpoint.
 			if (barrierId == currentCheckpointId) {
 				// regular case
@@ -174,7 +192,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		}
 
 		// check if we have all barriers
-		if (numReceivedBarriers == totalNumberOfInputChannels) {
+		if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Received all barrier, triggering checkpoint {} at {}",
 						receivedBarrier.getId(), receivedBarrier.getTimestamp());
@@ -232,7 +250,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private void onBarrier(int channelIndex) throws IOException {
 		if (!blockedChannels[channelIndex]) {
 			blockedChannels[channelIndex] = true;
-			numReceivedBarriers++;
+			numBarriersReceived++;
 			
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Received barrier from channel " + channelIndex);
@@ -255,7 +273,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		for (int i = 0; i < blockedChannels.length; i++) {
 			blockedChannels[i] = false;
 		}
-		numReceivedBarriers = 0;
+		numBarriersReceived = 0;
 
 		if (currentBuffered == null) {
 			// common case: no more buffered data
@@ -266,13 +284,14 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		}
 		else {
 			// uncommon case: buffered data pending
-			// push back the pending data
-			queuedBuffered.addFirst(currentBuffered);
+			// push back the pending data, if we have any
 			
-			// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one 
-			currentBuffered = bufferSpiller.rollOverWithNewBuffer();
-			if (currentBuffered != null) {
-				currentBuffered.open();
+			// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
+			BufferSpiller.SpilledBufferOrEventSequence bufferedNow = bufferSpiller.rollOverWithNewBuffer();
+			if (bufferedNow != null) {
+				bufferedNow.open();
+				queuedBuffered.addFirst(currentBuffered);
+				currentBuffered = bufferedNow;
 			}
 		}
 	}
@@ -296,6 +315,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	
 	@Override
 	public String toString() {
-		return String.format("last checkpoint: %d, current barriers: %d", currentCheckpointId, numReceivedBarriers);
+		return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d",
+				currentCheckpointId, numBarriersReceived, numClosedChannels);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d1bd0a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index dd4d395..a95839a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -19,14 +19,12 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
@@ -35,10 +33,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
-import java.util.ArrayDeque;
 import java.util.Arrays;
-import java.util.List;
-import java.util.Queue;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -84,7 +79,7 @@ public class BarrierBufferTest {
 					createEndOfPartition(0)
 			};
 
-			MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
 			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
 
 			for (BufferOrEvent boe : sequence) {
@@ -117,7 +112,7 @@ public class BarrierBufferTest {
 					createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2)
 			};
 
-			MockInputGate gate = new MockInputGate(4, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence));
 			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
 
 			for (BufferOrEvent boe : sequence) {
@@ -154,7 +149,7 @@ public class BarrierBufferTest {
 					createBuffer(0), createEndOfPartition(0)
 			};
 
-			MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
 			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
 
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
@@ -220,7 +215,7 @@ public class BarrierBufferTest {
 					createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2)
 			};
 
-			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
 
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
@@ -313,7 +308,7 @@ public class BarrierBufferTest {
 					createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)
 			};
 
-			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
 
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
@@ -402,7 +397,7 @@ public class BarrierBufferTest {
 					createBuffer(0), createEndOfPartition(0)
 			};
 
-			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
 
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
@@ -455,11 +450,15 @@ public class BarrierBufferTest {
 			check(sequence[42], buffer.getNextNonBlocked());
 			check(sequence[45], buffer.getNextNonBlocked());
 			check(sequence[46], buffer.getNextNonBlocked());
+			
+			// abort checkpoint 5 (end of partition)
+			check(sequence[37], buffer.getNextNonBlocked());
+			
+			// start checkpoint 6 alignment
 			check(sequence[47], buffer.getNextNonBlocked());
 			check(sequence[48], buffer.getNextNonBlocked());
 			
 			// end of input, emit remainder
-			check(sequence[37], buffer.getNextNonBlocked());
 			check(sequence[43], buffer.getNextNonBlocked());
 			check(sequence[44], buffer.getNextNonBlocked());
 			
@@ -504,7 +503,7 @@ public class BarrierBufferTest {
 					createBuffer(0), createEndOfPartition(0)
 			};
 
-			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
 
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
@@ -534,13 +533,13 @@ public class BarrierBufferTest {
 			check(sequence[16], buffer.getNextNonBlocked());
 			check(sequence[19], buffer.getNextNonBlocked());
 			check(sequence[20], buffer.getNextNonBlocked());
-			check(sequence[23], buffer.getNextNonBlocked());
-			check(sequence[24], buffer.getNextNonBlocked());
-
-			// end of input, emit remainder
+			
+			// checkpoint 3 aborted (end of partition)
 			check(sequence[18], buffer.getNextNonBlocked());
 			check(sequence[21], buffer.getNextNonBlocked());
 			check(sequence[22], buffer.getNextNonBlocked());
+			check(sequence[23], buffer.getNextNonBlocked());
+			check(sequence[24], buffer.getNextNonBlocked());
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
@@ -587,7 +586,7 @@ public class BarrierBufferTest {
 					createBuffer(0), createEndOfPartition(0)
 			};
 
-			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
 
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
@@ -620,16 +619,16 @@ public class BarrierBufferTest {
 			check(sequence[18], buffer.getNextNonBlocked());
 			check(sequence[22], buffer.getNextNonBlocked());
 			
-			// align remainder
+			// align checkpoint 4 remainder
 			check(sequence[25], buffer.getNextNonBlocked());
 			check(sequence[26], buffer.getNextNonBlocked());
-			check(sequence[29], buffer.getNextNonBlocked());
-			check(sequence[30], buffer.getNextNonBlocked());
 			
-			// end of input, emit remainder
+			// checkpoint 4 aborted (due to end of partition)
 			check(sequence[24], buffer.getNextNonBlocked());
 			check(sequence[27], buffer.getNextNonBlocked());
 			check(sequence[28], buffer.getNextNonBlocked());
+			check(sequence[29], buffer.getNextNonBlocked());
+			check(sequence[30], buffer.getNextNonBlocked());
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
@@ -688,7 +687,7 @@ public class BarrierBufferTest {
 					createBuffer(0), createEndOfPartition(0)
 			};
 
-			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
 
 			// checkpoint 1
@@ -759,7 +758,7 @@ public class BarrierBufferTest {
 					createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)
 			};
 
-			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
 
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
@@ -795,6 +794,88 @@ public class BarrierBufferTest {
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testStartAlignmentWithClosedChannels() {
+		try {
+			BufferOrEvent[] sequence = {
+					// close some channels immediately 
+					createEndOfPartition(2), createEndOfPartition(1),
+
+					// checkpoint without blocked data
+					createBuffer(0), createBuffer(0), createBuffer(3),
+					createBarrier(2, 3), createBarrier(2, 0),
+
+					// checkpoint with blocked data
+					createBuffer(3), createBuffer(0),
+					createBarrier(3, 3),
+					createBuffer(3), createBuffer(0),
+					createBarrier(3, 0),
+
+					// empty checkpoint
+					createBarrier(4, 0), createBarrier(4, 3),
+					
+					// some data, one channel closes
+					createBuffer(0), createBuffer(0), createBuffer(3),
+					createEndOfPartition(0),
+					
+					// checkpoint on last remaining channel
+					createBuffer(3),
+					createBarrier(5, 3),
+					createBuffer(3),
+					createEndOfPartition(3)
+			};
+
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence));
+			
+			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+			
+			// pre checkpoint 2
+			check(sequence[0], buffer.getNextNonBlocked());
+			check(sequence[1], buffer.getNextNonBlocked());
+			check(sequence[2], buffer.getNextNonBlocked());
+			check(sequence[3], buffer.getNextNonBlocked());
+			check(sequence[4], buffer.getNextNonBlocked());
+
+			// checkpoint 3 alignment
+			check(sequence[7], buffer.getNextNonBlocked());
+			assertEquals(2L, buffer.getCurrentCheckpointId());
+			check(sequence[8], buffer.getNextNonBlocked());
+			check(sequence[11], buffer.getNextNonBlocked());
+
+			// checkpoint 3 buffered
+			check(sequence[10], buffer.getNextNonBlocked());
+			assertEquals(3L, buffer.getCurrentCheckpointId());
+
+			// after checkpoint 4
+			check(sequence[15], buffer.getNextNonBlocked());
+			assertEquals(4L, buffer.getCurrentCheckpointId());
+			check(sequence[16], buffer.getNextNonBlocked());
+			check(sequence[17], buffer.getNextNonBlocked());
+			check(sequence[18], buffer.getNextNonBlocked());
+
+			check(sequence[19], buffer.getNextNonBlocked());
+			check(sequence[21], buffer.getNextNonBlocked());
+			assertEquals(5L, buffer.getCurrentCheckpointId());
+			check(sequence[22], buffer.getNextNonBlocked());
+
+			assertNull(buffer.getNextNonBlocked());
+			assertNull(buffer.getNextNonBlocked());
+			
+			buffer.cleanup();
+			
+			checkNoTempFilesRemain();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testEndOfStreamWhileCheckpoint() {
+		
+	}
 
 	// ------------------------------------------------------------------------
 	//  Utils
@@ -846,46 +927,6 @@ public class BarrierBufferTest {
 	//  Testing Mocks
 	// ------------------------------------------------------------------------
 
-	private static class MockInputGate implements InputGate {
-
-		private final int numChannels;
-		private final Queue<BufferOrEvent> boes;
-
-		public MockInputGate(int numChannels, List<BufferOrEvent> boes) {
-			this.numChannels = numChannels;
-			this.boes = new ArrayDeque<BufferOrEvent>(boes);
-		}
-
-		@Override
-		public int getNumberOfInputChannels() {
-			return numChannels;
-		}
-
-		@Override
-		public boolean isFinished() {
-			return boes.isEmpty();
-		}
-
-		@Override
-		public void requestPartitions() {}
-
-		@Override
-		public BufferOrEvent getNextBufferOrEvent() {
-			return boes.poll();
-		}
-
-		@Override
-		public void sendTaskEvent(TaskEvent event) {}
-
-		@Override
-		public void registerListener(EventListener<InputGate> listener) {}
-
-		@Override
-		public int getPageSize() {
-			return PAGE_SIZE;
-		}
-	}
-
 	private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {
 		
 		private long nextExpectedCheckpointId = -1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d1bd0a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index fb61633..c6010d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,20 +19,15 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import org.junit.Test;
 
-import java.util.ArrayDeque;
 import java.util.Arrays;
-import java.util.List;
-import java.util.Queue;
 
 import static org.junit.Assert.*;
 
@@ -41,12 +36,14 @@ import static org.junit.Assert.*;
  */
 public class BarrierTrackerTest {
 	
+	private static final int PAGE_SIZE = 512;
+	
 	@Test
 	public void testSingleChannelNoBarriers() {
 		try {
 			BufferOrEvent[] sequence = { createBuffer(0), createBuffer(0), createBuffer(0) };
 
-			MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
 			BarrierTracker tracker = new BarrierTracker(gate);
 
 			for (BufferOrEvent boe : sequence) {
@@ -70,7 +67,7 @@ public class BarrierTrackerTest {
 					createBuffer(1), createBuffer(1), createBuffer(2)
 			};
 
-			MockInputGate gate = new MockInputGate(4, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence));
 			BarrierTracker tracker = new BarrierTracker(gate);
 
 			for (BufferOrEvent boe : sequence) {
@@ -99,7 +96,7 @@ public class BarrierTrackerTest {
 					createBuffer(0)
 			};
 
-			MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
 			BarrierTracker tracker = new BarrierTracker(gate);
 
 			CheckpointSequenceValidator validator =
@@ -134,7 +131,7 @@ public class BarrierTrackerTest {
 					createBuffer(0)
 			};
 
-			MockInputGate gate = new MockInputGate(1, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
 			BarrierTracker tracker = new BarrierTracker(gate);
 
 			CheckpointSequenceValidator validator =
@@ -178,7 +175,7 @@ public class BarrierTrackerTest {
 					createBuffer(0)
 			};
 
-			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 			BarrierTracker tracker = new BarrierTracker(gate);
 
 			CheckpointSequenceValidator validator =
@@ -226,7 +223,7 @@ public class BarrierTrackerTest {
 					createBuffer(0)
 			};
 
-			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 			BarrierTracker tracker = new BarrierTracker(gate);
 
 			CheckpointSequenceValidator validator =
@@ -310,7 +307,7 @@ public class BarrierTrackerTest {
 					createBuffer(1), createBuffer(0), createBuffer(2)
 			};
 
-			MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
+			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 			BarrierTracker tracker = new BarrierTracker(gate);
 
 			CheckpointSequenceValidator validator =
@@ -348,47 +345,7 @@ public class BarrierTrackerTest {
 	// ------------------------------------------------------------------------
 	//  Testing Mocks
 	// ------------------------------------------------------------------------
-
-	private static class MockInputGate implements InputGate {
-
-		private final int numChannels;
-		private final Queue<BufferOrEvent> boes;
-
-		public MockInputGate(int numChannels, List<BufferOrEvent> boes) {
-			this.numChannels = numChannels;
-			this.boes = new ArrayDeque<BufferOrEvent>(boes);
-		}
-
-		@Override
-		public int getNumberOfInputChannels() {
-			return numChannels;
-		}
-
-		@Override
-		public boolean isFinished() {
-			return boes.isEmpty();
-		}
-
-		@Override
-		public void requestPartitions() {}
-
-		@Override
-		public BufferOrEvent getNextBufferOrEvent() {
-			return boes.poll();
-		}
-
-		@Override
-		public void sendTaskEvent(TaskEvent event) {}
-
-		@Override
-		public void registerListener(EventListener<InputGate> listener) {}
-
-		@Override
-		public int getPageSize() {
-			return 2;
-		}
-	}
-
+	
 	private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier> {
 
 		private final long[] checkpointIDs;

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d1bd0a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
new file mode 100644
index 0000000..cb8a058
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+
+public class MockInputGate implements InputGate {
+
+	private final int pageSize;
+	
+	private final int numChannels;
+	
+	private final Queue<BufferOrEvent> boes;
+
+	private final boolean[] closed;
+	
+	private int closedChannels;
+
+	
+	public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> boes) {
+		this.pageSize = pageSize;
+		this.numChannels = numChannels;
+		this.boes = new ArrayDeque<BufferOrEvent>(boes);
+		this.closed = new boolean[numChannels];
+	}
+
+	@Override
+	public int getPageSize() {
+		return pageSize;
+	}
+	
+	@Override
+	public int getNumberOfInputChannels() {
+		return numChannels;
+	}
+
+	@Override
+	public boolean isFinished() {
+		return boes.isEmpty();
+	}
+
+	@Override
+	public BufferOrEvent getNextBufferOrEvent() {
+		BufferOrEvent next = boes.poll();
+		if (next == null) {
+			return null;
+		}
+		
+		int channelIdx = next.getChannelIndex();
+		if (closed[channelIdx]) {
+			throw new RuntimeException("Inconsistent: Channel " + channelIdx
+					+ " has data even though it is already closed.");
+		}
+		if (next.isEvent() && next.getEvent() instanceof EndOfPartitionEvent) {
+			closed[channelIdx] = true;
+			closedChannels++;
+		}
+		return next;
+	}
+
+	@Override
+	public void requestPartitions() {}
+
+	@Override
+	public void sendTaskEvent(TaskEvent event) {}
+
+	@Override
+	public void registerListener(EventListener<InputGate> listener) {}
+	
+}
\ No newline at end of file


[2/4] flink git commit: [FLINK-2515] [job manager] Checkpoint coordinator triggers checkpoints only when tasks are running.

Posted by se...@apache.org.
[FLINK-2515] [job manager] Checkpoint coordinator triggers checkpoints only when tasks are running.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06e2da35
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06e2da35
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06e2da35

Branch: refs/heads/master
Commit: 06e2da352fb63f7922f634e6aaf5381d89de57a5
Parents: 4cc8d66
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 13 16:43:53 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 14 12:34:50 2015 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  3 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 54 +++++++++++++++++++-
 .../checkpoint/CheckpointStateRestoreTest.java  | 14 +++--
 3 files changed, 63 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06e2da35/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9694132..de83ad9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -23,6 +23,7 @@ import akka.actor.PoisonPill;
 import akka.actor.Props;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -224,7 +225,7 @@ public class CheckpointCoordinator {
 			ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length];
 			for (int i = 0; i < tasksToTrigger.length; i++) {
 				Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
-				if (ee != null) {
+				if (ee != null && ee.getState() == ExecutionState.RUNNING) {
 					triggerIDs[i] = ee.getAttemptId();
 				} else {
 					LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",

http://git-wip-us.apache.org/repos/asf/flink/blob/06e2da35/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index c02d301..cfe77d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -38,7 +39,7 @@ import java.util.List;
  */
 public class CheckpointCoordinatorTest {
 	
-	ClassLoader cl = Thread.currentThread().getContextClassLoader();
+	private static final ClassLoader cl = Thread.currentThread().getContextClassLoader();
 	
 	@Test
 	public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
@@ -83,6 +84,50 @@ public class CheckpointCoordinatorTest {
 	}
 
 	@Test
+	public void testCheckpointAbortsIfTriggerTasksAreFinished() {
+		try {
+			final JobID jid = new JobID();
+			final long timestamp = System.currentTimeMillis();
+
+			// create some mock Execution vertices that receive the checkpoint trigger messages
+			final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
+			ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
+			ExecutionVertex triggerVertex2 = mockExecutionVertex(triggerAttemptID2, ExecutionState.FINISHED);
+
+			// create some mock Execution vertices that need to ack the checkpoint
+			final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
+			ExecutionVertex ackVertex1 = mockExecutionVertex(ackAttemptID1);
+			ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2);
+
+			// set up the coordinator and validate the initial state
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+					jid, 1, 600000,
+					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
+					new ExecutionVertex[] { ackVertex1, ackVertex2 },
+					new ExecutionVertex[] {}, cl );
+
+			// nothing should be happening
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			// trigger the first checkpoint. this should not succeed
+			assertFalse(coord.triggerCheckpoint(timestamp));
+
+			// still, nothing should be happening
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
 		try {
 			final JobID jid = new JobID();
@@ -609,10 +654,15 @@ public class CheckpointCoordinatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
+		return mockExecutionVertex(attemptID, ExecutionState.RUNNING);
+	}
+	
+	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, ExecutionState state) {
 		final Execution exec = mock(Execution.class);
 		when(exec.getAttemptId()).thenReturn(attemptID);
+		when(exec.getState()).thenReturn(state);
 
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);

http://git-wip-us.apache.org/repos/asf/flink/blob/06e2da35/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index c23aaca..902eb4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -45,13 +45,13 @@ import static org.mockito.Mockito.*;
  */
 public class CheckpointStateRestoreTest {
 	
-	ClassLoader cl = Thread.currentThread().getContextClassLoader();
+	private static final ClassLoader cl = Thread.currentThread().getContextClassLoader();
 	
 	@Test
 	public void testSetState() {
 		try {
 			final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>(
-					new LocalStateHandle(new SerializableObject()));
+					new LocalStateHandle<SerializableObject>(new SerializableObject()));
 			
 			final JobID jid = new JobID();
 			final JobVertexID statefulId = new JobVertexID();
@@ -120,7 +120,7 @@ public class CheckpointStateRestoreTest {
 	public void testStateOnlyPartiallyAvailable() {
 		try {
 			final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>(
-					new LocalStateHandle(new SerializableObject()));
+					new LocalStateHandle<SerializableObject>(new SerializableObject()));
 
 			final JobID jid = new JobID();
 			final JobVertexID statefulId = new JobVertexID();
@@ -208,11 +208,15 @@ public class CheckpointStateRestoreTest {
 	}
 	
 	// ------------------------------------------------------------------------
-	
+
 	private Execution mockExecution() {
+		return mockExecution(ExecutionState.RUNNING);
+	}
+	
+	private Execution mockExecution(ExecutionState state) {
 		Execution mock = mock(Execution.class);
 		when(mock.getAttemptId()).thenReturn(new ExecutionAttemptID());
-		when(mock.getState()).thenReturn(ExecutionState.CREATED);
+		when(mock.getState()).thenReturn(state);
 		return mock;
 	}
 	


[4/4] flink git commit: [FLINK-2517] [docs] Minor fix to streaming guide

Posted by se...@apache.org.
[FLINK-2517] [docs] Minor fix to streaming guide

This closes #1013


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51872d73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51872d73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51872d73

Branch: refs/heads/master
Commit: 51872d73b83a86603fd1f2e8e481d3cceb755e38
Parents: e1d1bd0
Author: Nezih Yigitbasi <ny...@netflix.com>
Authored: Fri Aug 14 01:34:11 2015 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 14 12:34:51 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51872d73/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index b20482e..e375dab 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1597,7 +1597,7 @@ A class providing an interface for sending data to Kafka.
 
 The followings have to be provided for the `KafkaSink(…)` constructor in order:
 
-1. Zookeeper hostname
+1. Broker address (in hostname:port format, can be a comma separated list)
 2. The topic name
 3. Serialization schema
 
@@ -1606,12 +1606,12 @@ Example:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-stream.addSink(new KafkaSink<String>("localhost:2181", "test", new SimpleStringSchema()));
+stream.addSink(new KafkaSink<String>("localhost:9092", "test", new SimpleStringSchema()));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-stream.addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringSchema))
+stream.addSink(new KafkaSink[String]("localhost:9092", "test", new SimpleStringSchema))
 {% endhighlight %}
 </div>
 </div>
@@ -1633,7 +1633,7 @@ public KafkaSink(String zookeeperAddress, String topicId, Properties producerCon
 </div>
 </div>
 
-If this constructor is used, the user needs to make sure to set the broker with the "metadata.broker.list" property. Also the serializer configuration should be left default, the serialization should be set via SerializationSchema.
+If this constructor is used, the user needs to make sure to set the broker(s) with the "metadata.broker.list" property. Also the serializer configuration should be left default, the serialization should be set via SerializationSchema.
 
 More about Kafka can be found [here](https://kafka.apache.org/documentation.html).