You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/08 20:31:08 UTC

[1/9] flink git commit: [FLINK-2124] [streaming] Handle 'fromElements()' for non-serializable elements. Cleanup code, and add proper tests.

Repository: flink
Updated Branches:
  refs/heads/master 8acc0d2f1 -> e451a4ae0


[FLINK-2124] [streaming] Handle 'fromElements()' for non-serializable elements. Cleanup code, and add proper tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28713a29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28713a29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28713a29

Branch: refs/heads/master
Commit: 28713a2962d97bf8809ad23d96d395d998a98be8
Parents: e4a030c
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 8 17:56:04 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java | 146 +++++++-------
 .../functions/source/FromElementsFunction.java  | 111 +++++++----
 .../api/functions/source/SourceFunction.java    |   9 +-
 .../api/StreamExecutionEnvironmentTest.java     |  59 +++---
 .../api/functions/FromElementsFunctionTest.java | 191 +++++++++++++++++++
 .../api/functions/ListSourceContext.java        |  50 +++++
 .../api/scala/StreamExecutionEnvironment.scala  |  38 ++--
 7 files changed, 442 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index f98a9f0..58348e3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -65,6 +65,8 @@ import org.apache.flink.types.StringValue;
 import org.apache.flink.util.SplittableIterator;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -432,7 +434,7 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Creates a new data stream that contains a sequence of numbers. This is a parallel source,
 	 * if you manually set the parallelism to {@code 1}
-	 * (using {@link org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.setParallelism()})
+	 * (using {@link org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setParallelism(int)})
 	 * the generated sequence of elements is in order.
 	 *
 	 * @param from
@@ -467,35 +469,38 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
 		if (data.length == 0) {
-			throw new IllegalArgumentException(
-					"fromElements needs at least one element as argument");
+			throw new IllegalArgumentException("fromElements needs at least one element as argument");
 		}
 
-		TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data[0]);
-
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
-
-		return addSource(function, "Elements source").returns(typeInfo);
+		TypeInformation<OUT> typeInfo;
+		try {
+			typeInfo = TypeExtractor.getForObject(data[0]);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
+				+ "; please specify the TypeInformation manually via "
+				+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
+		}
+		return fromCollection(Arrays.asList(data), typeInfo);
 	}
 
 	/**
 	 * Creates a data stream from the given non-empty collection. The type of the data stream is that of the
 	 * elements in the collection.
 	 *
-	 * <p>
-	 * The framework will try and determine the exact type from the collection elements. In case of generic
+	 * <p>The framework will try and determine the exact type from the collection elements. In case of generic
 	 * elements, it may be necessary to manually supply the type information via
-	 * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
-	 * <p>
+	 * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.</p>
 	 *
-	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
-	 * degree of parallelism one.
+	 * <p>Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
+	 * parallelism one.</p>
 	 *
 	 * @param data
-	 * 		The collection of elements to create the data stream from
+	 * 		The collection of elements to create the data stream from.
 	 * @param <OUT>
-	 * 		The type of the returned data stream
-	 * @return The data stream representing the given collection
+	 *     The generic type of the returned data stream.
+	 * @return
+	 *     The data stream representing the given collection
 	 */
 	public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
 		Preconditions.checkNotNull(data, "Collection must not be null");
@@ -503,16 +508,28 @@ public abstract class StreamExecutionEnvironment {
 			throw new IllegalArgumentException("Collection must not be empty");
 		}
 
-		TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data.iterator().next());
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
-		checkCollection(data, typeInfo.getTypeClass());
+		OUT first = data.iterator().next();
+		if (first == null) {
+			throw new IllegalArgumentException("Collection must not contain null elements");
+		}
 
-		return addSource(function, "Collection Source").returns(typeInfo);
+		TypeInformation<OUT> typeInfo;
+		try {
+			typeInfo = TypeExtractor.getForObject(first);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Could not create TypeInformation for type " + first.getClass()
+					+ "; please specify the TypeInformation manually via "
+					+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
+		}
+		return fromCollection(data, typeInfo);
 	}
 
 	/**
-	 * Creates a data stream from the given non-empty collection.Note that this operation will result in
-	 * a non-parallel data stream source, i.e. a data stream source with a degree of parallelism one.
+	 * Creates a data stream from the given non-empty collection.
+	 * 
+	 * <p>Note that this operation will result in a non-parallel data stream source,
+	 * i.e., a data stream source with a parallelism one.</p>
 	 *
 	 * @param data
 	 * 		The collection of elements to create the data stream from
@@ -522,26 +539,31 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream representing the given collection
 	 */
-	public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT>
-			typeInfo) {
+	public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo) {
 		Preconditions.checkNotNull(data, "Collection must not be null");
-		if (data.isEmpty()) {
-			throw new IllegalArgumentException("Collection must not be empty");
+		
+		// must not have null elements and mixed elements
+		FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());
+		
+		SourceFunction<OUT> function;
+		try {
+			function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
 		}
-
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
-		checkCollection(data, typeInfo.getTypeClass());
-
-		return addSource(function, "Collection Source").returns(typeInfo);
+		catch (IOException e) {
+			throw new RuntimeException(e.getMessage(), e);
+		}
+		return addSource(function, "Collection Source", typeInfo);
 	}
 
 	/**
-	 * Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual
-	 * execution happens, the type of data returned by the iterator must be given explicitly in the form of the type
-	 * class (this is due to the fact that the Java compiler erases the generic type information).
-	 * <p>
-	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
-	 * degree of parallelism of one.
+	 * Creates a data stream from the given iterator.
+	 * 
+	 * <p>Because the iterator will remain unmodified until the actual execution happens,
+	 * the type of data returned by the iterator must be given explicitly in the form of the type
+	 * class (this is due to the fact that the Java compiler erases the generic type information).</p>
+	 * 
+	 * <p>Note that this operation will result in a non-parallel data stream source, i.e.,
+	 * a data stream source with a parallelism of one.</p>
 	 *
 	 * @param data
 	 * 		The iterator of elements to create the data stream from
@@ -557,14 +579,16 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual
-	 * execution happens, the type of data returned by the iterator must be given explicitly in the form of the type
-	 * information. This method is useful for cases where the type is generic. In that case, the type class (as
-	 * given in
-	 * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.
-	 * <p>
-	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
-	 * degree of parallelism one.
+	 * Creates a data stream from the given iterator.
+	 * 
+	 * <p>Because the iterator will remain unmodified until the actual execution happens,
+	 * the type of data returned by the iterator must be given explicitly in the form of the type
+	 * information. This method is useful for cases where the type is generic.
+	 * In that case, the type class (as given in
+	 * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.</p>
+	 * 
+	 * <p>Note that this operation will result in a non-parallel data stream source, i.e.,
+	 * a data stream source with a parallelism one.</p>
 	 *
 	 * @param data
 	 * 		The iterator of elements to create the data stream from
@@ -574,27 +598,20 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream representing the elements in the iterator
 	 */
-	public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT>
-			typeInfo) {
+	public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT> typeInfo) {
 		Preconditions.checkNotNull(data, "The iterator must not be null");
 
 		SourceFunction<OUT> function = new FromIteratorFunction<OUT>(data);
-		return addSource(function, "Collection Source").returns(typeInfo);
-	}
-
-	// private helper for passing different names
-	private <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> iterator, TypeInformation<OUT>
-			typeInfo, String operatorName) {
-		return addSource(new FromIteratorFunction<OUT>(iterator), operatorName).returns(typeInfo);
+		return addSource(function, "Collection Source", typeInfo);
 	}
 
 	/**
 	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
 	 * framework to create a parallel data stream source that returns the elements in the iterator.
-	 * <p>
-	 * Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
+	 * 
+	 * <p>Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
 	 * iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler
-	 * erases the generic type information).
+	 * erases the generic type information).</p>
 	 *
 	 * @param iterator
 	 * 		The iterator that produces the elements of the data stream
@@ -1194,19 +1211,6 @@ public abstract class StreamExecutionEnvironment {
 		currentEnvironment = eef.createExecutionEnvironment();
 	}
 
-	private static <OUT> void checkCollection(Collection<OUT> elements, Class<OUT> viewedAs) {
-		Preconditions.checkNotNull(viewedAs);
-
-		for (OUT elem : elements) {
-			Preconditions.checkNotNull(elem, "The collection must not contain null elements.");
-
-			if (!viewedAs.isAssignableFrom(elem.getClass())) {
-				throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
-						viewedAs.getCanonicalName());
-			}
-		}
-	}
-
 	/**
 	 * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
 	 * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}

http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 63eb0ad..394fa77 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -27,75 +27,108 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Iterator;
-
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * A stream source function that returns a sequence of elements.
+ * 
+ * <p>Upon construction, this source function serializes the elements using Flink's type information.
+ * That way, any object transport using Java serialization will not be affected by the serializability
+ * if the elements.</p>
+ * 
+ * @param <T> The type of elements returned by this function.
+ */
 public class FromElementsFunction<T> implements SourceFunction<T> {
 	
 	private static final long serialVersionUID = 1L;
 
+	/** The (de)serializer to be used for the data elements */
 	private final TypeSerializer<T> serializer;
-	private final byte[] elements;
+	
+	/** The actual data elements, in serialized form */
+	private final byte[] elementsSerialized;
+	
+	/** The number of serialized elements */
+	private final int numElements;
 
+	/** Flag to make the source cancelable */
 	private volatile boolean isRunning = true;
 
-	public FromElementsFunction(TypeSerializer<T> serializer, final T... elements) {
-		this(serializer, new Iterable<T>() {
-			@Override
-			public Iterator<T> iterator() {
-				return new Iterator<T>() {
-					int index = 0;
-
-					@Override
-					public boolean hasNext() {
-						return index < elements.length;
-					}
-
-					@Override
-					public T next() {
-						return elements[index++];
-					}
-
-					@Override
-					public void remove() {
-						throw new UnsupportedOperationException();
-					}
-				};
-			}
-		});
+	
+	public FromElementsFunction(TypeSerializer<T> serializer, T... elements) throws IOException {
+		this(serializer, Arrays.asList(elements));
 	}
-
-	public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> elements) {
+	
+	public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> elements) throws IOException {
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
 
+		int count = 0;
 		try {
 			for (T element : elements) {
 				serializer.serialize(element, wrapper);
+				count++;
 			}
-		} catch (IOException e) {
-			// ByteArrayOutputStream doesn't throw IOExceptions when written to
 		}
-		// closing the DataOutputStream would just flush the ByteArrayOutputStream, which in turn doesn't do anything.
+		catch (Exception e) {
+			throw new IOException("Serializing the source elements failed: " + e.getMessage(), e);
+		}
 
 		this.serializer = serializer;
-		this.elements = baos.toByteArray();
+		this.elementsSerialized = baos.toByteArray();
+		this.numElements = count;
 	}
 
 	@Override
 	public void run(SourceContext<T> ctx) throws Exception {
-		T value = serializer.createInstance();
-		ByteArrayInputStream bais = new ByteArrayInputStream(elements);
+		ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
 		DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
 
-		while (isRunning && bais.available() > 0) {
-			value = serializer.deserialize(value, input);
-			ctx.collect(value);
+		int numEmitted = 0;
+		while (isRunning && numEmitted++ < numElements) {
+			T next;
+			try {
+				next = serializer.deserialize(input);
+			}
+			catch (Exception e) {
+				throw new IOException("Failed to deserialize an element from the source. " +
+						"If you are using user-defined serialization (Value and Writable types), check the " +
+						"serialization functions.\nSerializer is " + serializer);
+			}
+			
+			ctx.collect(next);
 		}
-		// closing the DataInputStream would just close the ByteArrayInputStream, which doesn't do anything
 	}
 
 	@Override
 	public void cancel() {
 		isRunning = false;
 	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Verifies that all elements in the collection are non-null, and are of the given class, or
+	 * a subclass thereof.
+	 * 
+	 * @param elements The collection to check.
+	 * @param viewedAs The class to which the elements must be assignable to.
+	 * 
+	 * @param <OUT> The generic type of the collection to be checked.
+	 */
+	public static <OUT> void checkCollection(Collection<OUT> elements, Class<OUT> viewedAs) {
+		for (OUT elem : elements) {
+			if (elem == null) {
+				throw new IllegalArgumentException("The collection contains a null element");
+			}
+
+			if (!viewedAs.isAssignableFrom(elem.getClass())) {
+				throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
+						viewedAs.getCanonicalName());
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index 921a33b..58ee1da 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -45,11 +45,10 @@ import org.apache.flink.api.common.functions.Function;
  * {@code
  *  public class ExampleSource<T> implements SourceFunction<T>, Checkpointed<Long> {
  *      private long count = 0L;
- *      private volatile boolean isRunning;
+ *      private volatile boolean isRunning = true;
  *
  *      @Override
  *      public void run(SourceContext<T> ctx) {
- *          isRunning = true;
  *          while (isRunning && count < 1000) {
  *              synchronized (ctx.getCheckpointLock()) {
  *                  ctx.collect(count);
@@ -104,16 +103,20 @@ public interface SourceFunction<T> extends Function, Serializable {
 	 *
 	 * @param <T> The type of the elements produced by the source.
 	 */
-	interface SourceContext<T> {
+	public static interface SourceContext<T> {
 
 		/**
 		 * Emits one element from the source.
+		 * 
+		 * @param element The element to emit.
 		 */
 		void collect(T element);
 
 		/**
 		 * Returns the checkpoint lock. Please refer to the explanation about checkpointed sources
 		 * in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}.
+		 * 
+		 * @return The object to use the lock. 
 		 */
 		Object getCheckpointLock();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 7373276..e2fe599 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -24,14 +24,14 @@ import static org.junit.Assert.fail;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -49,27 +49,33 @@ public class StreamExecutionEnvironmentTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testFromCollectionParallelism() {
-		TypeInformation<Object> typeInfo = TypeExtractor.getForClass(Object.class);
-		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
-		boolean seenExpectedException = false;
-
 		try {
-			DataStream<Object> dataStream1 = env.fromCollection(new DummySplittableIterator(), typeInfo)
-					.setParallelism(4);
-		} catch (IllegalArgumentException e) {
-			seenExpectedException = true;
+			TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
+			StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+
+			DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
+			
+			try {
+				dataStream1.setParallelism(4);
+				fail("should throw an exception");
+			}
+			catch (IllegalArgumentException e) {
+				// expected
+			}
+	
+			env.fromParallelCollection(new DummySplittableIterator<Integer>(), typeInfo).setParallelism(4);
+	
+			String plan = env.getExecutionPlan();
+			
+			assertTrue("Parallelism for dataStream1 is not right.",
+					plan.contains("\"contents\":\"Collection Source\",\"parallelism\":1"));
+			assertTrue("Parallelism for dataStream2 is not right.",
+					plan.contains("\"contents\":\"Parallel Collection Source\",\"parallelism\":4"));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
 		}
-
-		DataStream<Object> dataStream2 = env.fromParallelCollection(new DummySplittableIterator(), typeInfo)
-				.setParallelism(4);
-
-		String plan = env.getExecutionPlan();
-
-		assertTrue("Expected Exception for setting parallelism was not thrown.", seenExpectedException);
-		assertTrue("Parallelism for dataStream1 is not right.",
-				plan.contains("\"contents\":\"Collection Source\",\"parallelism\":1"));
-		assertTrue("Parallelism for dataStream2 is not right.",
-				plan.contains("\"contents\":\"Parallel Collection Source\",\"parallelism\":4"));
 	}
 
 	@Test
@@ -119,12 +125,13 @@ public class StreamExecutionEnvironmentTest {
 		return (SourceFunction<T>) operator.getUserFunction();
 	}
 
-	public static class DummySplittableIterator extends SplittableIterator {
+	public static class DummySplittableIterator<T> extends SplittableIterator<T> {
 		private static final long serialVersionUID = 1312752876092210499L;
 
+		@SuppressWarnings("unchecked")
 		@Override
-		public Iterator[] split(int numPartitions) {
-			return new Iterator[0];
+		public Iterator<T>[] split(int numPartitions) {
+			return (Iterator<T>[]) new Iterator<?>[0];
 		}
 
 		@Override
@@ -138,8 +145,8 @@ public class StreamExecutionEnvironmentTest {
 		}
 
 		@Override
-		public Object next() {
-			return null;
+		public T next() {
+			throw new NoSuchElementException();
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
new file mode 100644
index 0000000..db91b33
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.ExceptionUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the {@link org.apache.flink.streaming.api.functions.source.FromElementsFunction}.
+ */
+public class FromElementsFunctionTest {
+	
+	@Test
+	public void testStrings() {
+		try {
+			String[] data = { "Oh", "boy", "what", "a", "show", "!"};
+
+			FromElementsFunction<String> source = new FromElementsFunction<String>(
+					BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), data);
+			
+			List<String> result = new ArrayList<String>();
+			source.run(new ListSourceContext<String>(result));
+			
+			assertEquals(Arrays.asList(data), result);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testNonJavaSerializableType() {
+		try {
+			MyPojo[] data = { new MyPojo(1, 2), new MyPojo(3, 4), new MyPojo(5, 6) };
+
+			FromElementsFunction<MyPojo> source = new FromElementsFunction<MyPojo>(
+					TypeExtractor.getForClass(MyPojo.class).createSerializer(new ExecutionConfig()), data);
+
+			List<MyPojo> result = new ArrayList<MyPojo>();
+			source.run(new ListSourceContext<MyPojo>(result));
+
+			assertEquals(Arrays.asList(data), result);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerializationError() {
+		try {
+			TypeInformation<SerializationErrorType> info = 
+					new ValueTypeInfo<SerializationErrorType>(SerializationErrorType.class);
+			
+			try {
+				new FromElementsFunction<SerializationErrorType>(
+					info.createSerializer(new ExecutionConfig()), new SerializationErrorType());
+				
+				fail("should fail with an exception");
+			}
+			catch (IOException e) {
+				assertTrue(ExceptionUtils.stringifyException(e).contains("test exception"));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testDeSerializationError() {
+		try {
+			TypeInformation<DeserializeTooMuchType> info =
+					new ValueTypeInfo<DeserializeTooMuchType>(DeserializeTooMuchType.class);
+
+			FromElementsFunction<DeserializeTooMuchType> source = new FromElementsFunction<DeserializeTooMuchType>(
+					info.createSerializer(new ExecutionConfig()), new DeserializeTooMuchType());
+			
+			try {
+				source.run(new ListSourceContext<DeserializeTooMuchType>(new ArrayList<DeserializeTooMuchType>()));
+				fail("should fail with an exception");
+			}
+			catch (IOException e) {
+				assertTrue(ExceptionUtils.stringifyException(e).contains("user-defined serialization"));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	
+	// ------------------------------------------------------------------------
+	//  Test Types
+	// ------------------------------------------------------------------------
+	
+	public static class MyPojo {
+		
+		public long val1;
+		public int val2;
+
+		public MyPojo() {}
+		
+		public MyPojo(long val1, int val2) {
+			this.val1 = val1;
+			this.val2 = val2;
+		}
+
+		@Override
+		public int hashCode() {
+			return this.val2;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof MyPojo) {
+				MyPojo that = (MyPojo) obj;
+				return this.val1 == that.val1 && this.val2 == that.val2; 
+			}
+			else {
+				return false;
+			}
+		}
+	}
+	
+	public static class SerializationErrorType implements Value {
+
+		private static final long serialVersionUID = -6037206294939421807L;
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			throw new IOException("test exception");
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			throw new IOException("test exception");
+		}
+	}
+
+	public static class DeserializeTooMuchType implements Value {
+
+		private static final long serialVersionUID = -6037206294939421807L;
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeInt(42);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			in.readLong();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
new file mode 100644
index 0000000..e718633
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.List;
+
+/**
+ * Mock context that collects elements in a List.
+ * 
+ * @param <T> Type of the collected elements.
+ */
+public class ListSourceContext<T> implements SourceFunction.SourceContext<T> {
+	
+	private final Object lock = new Object();
+	
+	private final List<T> target;
+
+	
+	public ListSourceContext(List<T> target) {
+		this.target = target;
+	}
+
+	@Override
+	public void collect(T element) {
+		target.add(element);
+	}
+
+	@Override
+	public Object getCheckpointLock() {
+		return lock;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/28713a29/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 7215a4d..70e652f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.StateHandleProvider
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.types.StringValue
 import org.apache.flink.util.SplittableIterator
 
@@ -47,7 +47,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Sets the parallelism for operations executed through this environment.
    * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
    * with x parallel instances. This value can be overridden by specific operations using
-   * [[DataStream.setParallelism]].
+   * [[DataStream#setParallelism(int)]].
    * @deprecated Please use [[setParallelism]]
    */
   @deprecated
@@ -57,7 +57,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
    * Returns the default parallelism for this execution environment. Note that this
-   * value can be overridden by individual operations using [[DataStream.setParallelism]]
+   * value can be overridden by individual operations using [[DataStream#setParallelism(int)]]
    * @deprecated Please use [[getParallelism]]
    */
   @deprecated
@@ -67,7 +67,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Sets the parallelism for operations executed through this environment.
    * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
    * with x parallel instances. This value can be overridden by specific operations using
-   * [[DataStream.setParallelism]].
+   * [[DataStream#setParallelism(int)]].
    */
   def setParallelism(parallelism: Int): Unit = {
     javaEnv.setParallelism(parallelism)
@@ -75,7 +75,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
    * Returns the default parallelism for this execution environment. Note that this
-   * value can be overridden by individual operations using [[DataStream.setParallelism]]
+   * value can be overridden by individual operations using [[DataStream#setParallelism(int)]]
    */
   def getParallelism = javaEnv.getParallelism
 
@@ -86,15 +86,10 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * can result in three logical modes:
    *
    * <ul>
-   * <li>
-   * A positive integer triggers flushing periodically by that integer</li>
-   * <li>
-   * 0 triggers flushing after every record thus minimizing latency</li>
-   * <li>
-   * -1 triggers flushing only when the output buffer is full thus maximizing
-   * throughput</li>
+   *   <li>A positive integer triggers flushing periodically by that integer</li>
+   *   <li>0 triggers flushing after every record thus minimizing latency</li>
+   *   <li>-1 triggers flushing only when the output buffer is full thus maximizing throughput</li>
    * </ul>
-   *
    */
   def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
     javaEnv.setBufferTimeout(timeoutMillis)
@@ -127,7 +122,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *
    * Setting this option assumes that the job is used in production and thus if not stated
    * explicitly otherwise with calling with the
-   * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
+   * [[setNumberOfExecutionRetries(int)]] method in case of
    * failure the job will be resubmitted to the cluster indefinitely.
    */
   @deprecated
@@ -142,7 +137,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * 
    * Setting this option assumes that the job is used in production and thus if not stated
    * explicitly otherwise with calling with the
-   * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
+   * [[setNumberOfExecutionRetries(int)]] method in case of
    * failure the job will be resubmitted to the cluster indefinitely.
    */
   def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
@@ -156,7 +151,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    *
    * Setting this option assumes that the job is used in production and thus if not stated
    * explicitly otherwise with calling with the
-   * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
+   * [[setNumberOfExecutionRetries(int)]] method in case of
    * failure the job will be resubmitted to the cluster indefinitely.
    */
   def enableCheckpointing() : StreamExecutionEnvironment = {
@@ -284,10 +279,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     require(data != null, "Data must not be null.")
     val typeInfo = implicitly[TypeInformation[T]]
 
-    val sourceFunction = new FromElementsFunction[T](typeInfo.createSerializer(getConfig),
-      scala.collection.JavaConversions.asJavaCollection(data))
-
-    javaEnv.addSource(sourceFunction).returns(typeInfo)
+    javaEnv.fromCollection(scala.collection.JavaConversions.asJavaCollection(data), typeInfo)
   }
 
   /**
@@ -460,7 +452,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   def getExecutionPlan = javaEnv.getExecutionPlan
 
   /**
-   * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
+   * Getter of the [[org.apache.flink.streaming.api.graph.StreamGraph]] of the streaming job.
    *
    * @return The StreamGraph representing the transformations
    */
@@ -468,7 +460,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
    * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
-   * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig}
+   * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]
    */
   private[flink] def scalaClean[F <: AnyRef](f: F): F = {
     if (getConfig.isClosureCleanerEnabled) {
@@ -484,7 +476,7 @@ object StreamExecutionEnvironment {
 
   /**
    * Sets the default parallelism that will be used for the local execution
-   * environment created by {@link #createLocalEnvironment()}.
+   * environment created by [[createLocalEnvironment()]].
    *
    * @param parallelism
    * The parallelism to use as the default local parallelism.


[9/9] flink git commit: [FLINK-2330] [streaming] Make FromElementsFunction checkpointable

Posted by se...@apache.org.
[FLINK-2330] [streaming] Make FromElementsFunction checkpointable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e451a4ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e451a4ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e451a4ae

Branch: refs/heads/master
Commit: e451a4ae00d84c2ed1aae1bec6cc2c43caa2bf90
Parents: 28713a2
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 8 19:02:59 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:41 2015 +0200

----------------------------------------------------------------------
 .../functions/source/FromElementsFunction.java  | 74 ++++++++++++++++++--
 .../api/functions/FromElementsFunctionTest.java | 74 ++++++++++++++++++++
 .../api/functions/ListSourceContext.java        | 16 +++++
 3 files changed, 158 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e451a4ae/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 394fa77..28544ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -39,7 +40,7 @@ import java.util.Collection;
  * 
  * @param <T> The type of elements returned by this function.
  */
-public class FromElementsFunction<T> implements SourceFunction<T> {
+public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedAsynchronously<Integer> {
 	
 	private static final long serialVersionUID = 1L;
 
@@ -52,6 +53,12 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
 	/** The number of serialized elements */
 	private final int numElements;
 
+	/** The number of elements emitted already */
+	private volatile int numElementsEmitted;
+
+	/** The number of elements to skip initially */
+	private volatile int numElementsToSkip;
+	
 	/** Flag to make the source cancelable */
 	private volatile boolean isRunning = true;
 
@@ -83,10 +90,29 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
 	@Override
 	public void run(SourceContext<T> ctx) throws Exception {
 		ByteArrayInputStream bais = new ByteArrayInputStream(elementsSerialized);
-		DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
-
-		int numEmitted = 0;
-		while (isRunning && numEmitted++ < numElements) {
+		final DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
+		
+		// if we are restored from a checkpoint and need to skip elements, skip them now.
+		int toSkip = numElementsToSkip;
+		if (toSkip > 0) {
+			try {
+				while (toSkip > 0) {
+					serializer.deserialize(input);
+					toSkip--;
+				}
+			}
+			catch (Exception e) {
+				throw new IOException("Failed to deserialize an element from the source. " +
+						"If you are using user-defined serialization (Value and Writable types), check the " +
+						"serialization functions.\nSerializer is " + serializer);
+			}
+			
+			this.numElementsEmitted = this.numElementsToSkip;
+		}
+		
+		final Object lock = ctx.getCheckpointLock();
+		
+		while (isRunning && numElementsEmitted < numElements) {
 			T next;
 			try {
 				next = serializer.deserialize(input);
@@ -97,7 +123,10 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
 						"serialization functions.\nSerializer is " + serializer);
 			}
 			
-			ctx.collect(next);
+			synchronized (lock) {
+				ctx.collect(next);
+				numElementsEmitted++;
+			}
 		}
 	}
 
@@ -105,6 +134,39 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
 	public void cancel() {
 		isRunning = false;
 	}
+
+
+	/**
+	 * Gets the number of elements produced in total by this function.
+	 * 
+	 * @return The number of elements produced in total.
+	 */
+	public int getNumElements() {
+		return numElements;
+	}
+
+	/**
+	 * Gets the number of elements emitted so far.
+	 * 
+	 * @return The number of elements emitted so far.
+	 */
+	public int getNumElementsEmitted() {
+		return numElementsEmitted;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Checkpointing
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public Integer snapshotState(long checkpointId, long checkpointTimestamp) {
+		return this.numElementsEmitted;
+	}
+
+	@Override
+	public void restoreState(Integer state) {
+		this.numElementsToSkip = state;
+	}
 	
 	// ------------------------------------------------------------------------
 	//  Utilities

http://git-wip-us.apache.org/repos/asf/flink/blob/e451a4ae/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
index db91b33..9c3653b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
@@ -21,13 +21,17 @@ package org.apache.flink.streaming.api.functions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -125,6 +129,76 @@ public class FromElementsFunctionTest {
 		}
 	}
 	
+	@Test
+	public void testCheckpointAndRestore() {
+		try {
+			final int NUM_ELEMENTS = 10000;
+			
+			List<Integer> data = new ArrayList<Integer>(NUM_ELEMENTS);
+			List<Integer> result = new ArrayList<Integer>(NUM_ELEMENTS);
+			
+			for (int i = 0; i < NUM_ELEMENTS; i++) {
+				data.add(i);
+			}
+			
+			final FromElementsFunction<Integer> source = new FromElementsFunction<Integer>(IntSerializer.INSTANCE, data);
+			final FromElementsFunction<Integer> sourceCopy = CommonTestUtils.createCopySerializable(source);
+			
+			final SourceFunction.SourceContext<Integer> ctx = new ListSourceContext<Integer>(result, 2L);
+			
+			final Throwable[] error = new Throwable[1];
+			
+			// run the source asynchronously
+			Thread runner = new Thread() {
+				@Override
+				public void run() {
+					try {
+						source.run(ctx);
+					}
+					catch (Throwable t) {
+						error[0] = t;
+					}
+				}
+			};
+			runner.start();
+			
+			// wait for a bit 
+			Thread.sleep(1000);
+			
+			// make a checkpoint
+			int count;
+			List<Integer> checkpointData = new ArrayList<Integer>(NUM_ELEMENTS);
+			
+			synchronized (ctx.getCheckpointLock()) {
+				count = source.snapshotState(566, System.currentTimeMillis());
+				checkpointData.addAll(result);
+			}
+			
+			// cancel the source
+			source.cancel();
+			runner.join();
+			
+			// check for errors
+			if (error[0] != null) {
+				System.err.println("Error in asynchronous source runner");
+				error[0].printStackTrace();
+				fail("Error in asynchronous source runner");
+			}
+			
+			// recovery run
+			SourceFunction.SourceContext<Integer> newCtx = new ListSourceContext<Integer>(checkpointData);
+			sourceCopy.restoreState(count);
+			
+			sourceCopy.run(newCtx);
+			
+			assertEquals(data, checkpointData);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
 	
 	// ------------------------------------------------------------------------
 	//  Test Types

http://git-wip-us.apache.org/repos/asf/flink/blob/e451a4ae/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
index e718633..f241955 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
@@ -33,14 +33,30 @@ public class ListSourceContext<T> implements SourceFunction.SourceContext<T> {
 	
 	private final List<T> target;
 
+	private final long delay;
+	
 	
 	public ListSourceContext(List<T> target) {
+		this(target, 0L);
+	}
+
+	public ListSourceContext(List<T> target, long delay) {
 		this.target = target;
+		this.delay = delay;
 	}
 
 	@Override
 	public void collect(T element) {
 		target.add(element);
+		
+		if (delay > 0) {
+			try {
+				Thread.sleep(delay);
+			}
+			catch (InterruptedException e) {
+				// ignore
+			}
+		}
 	}
 
 	@Override


[7/9] flink git commit: [FLINK-2288] [FLINK-2302] Setup ZooKeeper for distributed coordination

Posted by se...@apache.org.
[FLINK-2288] [FLINK-2302] Setup ZooKeeper for distributed coordination

- FLINK-2288: Setup ZooKeeper for distributed coordination
  * Add FlinkZooKeeperQuorumPeer to wrap ZooKeeper's quorum peers with
    utilities to write required config values (default datadir, myid)
  * Add default conf/zoo.cfg config for ZooKeeper
  * Add startup scripts for ZooKeeper quorum
  * Add conf/masters file for HA masters

- FLINK-2302: Allow multiple instances to run on single host
  * Multiple TaskManager and JobManager instances can run on a single
    host.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c72b50d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c72b50d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c72b50d

Branch: refs/heads/master
Commit: 8c72b50d87c5a4946b67d722fc1ec3adb7c9bf16
Parents: 60cfa0b
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Jul 3 11:04:45 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  58 ++++-
 flink-dist/src/main/flink-bin/bin/config.sh     |  55 ++++-
 .../src/main/flink-bin/bin/flink-daemon.sh      | 146 ++++++++++++
 flink-dist/src/main/flink-bin/bin/jobmanager.sh | 112 +++------
 .../flink-bin/bin/start-cluster-streaming.sh    |  28 +--
 .../src/main/flink-bin/bin/start-cluster.sh     |  46 ++--
 .../main/flink-bin/bin/start-local-streaming.sh |   7 +-
 .../src/main/flink-bin/bin/start-local.sh       |   7 +-
 .../flink-bin/bin/start-zookeeper-quorum.sh     |  46 ++++
 .../src/main/flink-bin/bin/stop-cluster.sh      |  37 ++-
 .../main/flink-bin/bin/stop-zookeeper-quorum.sh |  46 ++++
 .../src/main/flink-bin/bin/taskmanager.sh       |  88 ++-----
 flink-dist/src/main/flink-bin/bin/zookeeper.sh  |  56 +++++
 flink-dist/src/main/flink-bin/conf/masters      |   1 +
 flink-dist/src/main/flink-bin/conf/zoo.cfg      |  18 ++
 flink-runtime/pom.xml                           |  51 +++--
 .../jobmanager/JobManagerCliOptions.java        |  12 +
 .../flink/runtime/util/ZooKeeperUtil.java       | 110 +++++++++
 .../zookeeper/FlinkZooKeeperQuorumPeer.java     | 227 +++++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  80 +++++--
 .../flink/runtime/taskmanager/TaskManager.scala |   7 +-
 .../flink/runtime/util/ZooKeeperUtilTest.java   |  76 +++++++
 pom.xml                                         |  16 ++
 23 files changed, 1055 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 9d8da21..e3b8b53 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -671,11 +671,61 @@ public final class ConfigConstants {
 	 */
 	public static final String LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER = "localinstancemanager.numtaskmanager";
 
-
 	public static final String LOCAL_INSTANCE_MANAGER_START_WEBSERVER = "localinstancemanager.start-webserver";
-	
-	// ------------------------------------------------------------------------
-	
+
+	// --------------------------- ZooKeeper ----------------------------------
+
+	/** ZooKeeper servers. */
+	public static final String ZOOKEEPER_QUORUM_KEY = "ha.zookeeper.quorum";
+
+	/** ZooKeeper root path. */
+	public static final String ZOOKEEPER_DIR_KEY = "ha.zookeeper.dir";
+
+	public static final String ZOOKEEPER_LATCH_PATH = "ha.zookeeper.dir.latch";
+
+	public static final String ZOOKEEPER_LEADER_PATH = "ha.zookeeper.dir.leader";
+
+	public static final String ZOOKEEPER_SESSION_TIMEOUT = "ha.zookeeper.client.session-timeout";
+
+	public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "ha.zookeeper.client.connection-timeout";
+
+	public static final String ZOOKEEPER_RETRY_WAIT = "ha.zookeeper.client.retry-wait";
+
+	public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "ha.zookeeper.client.max-retry-attempts";
+
+	// - Defaults -------------------------------------------------------------
+
+	public static final String DEFAULT_ZOOKEEPER_ZNODE_ROOT = "/flink";
+
+	public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = "/leaderlatch";
+
+	public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader";
+
+	public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000;
+
+	public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000;
+
+	public static final int DEFAULT_ZOOKEEPER_RETRY_WAIT = 5000;
+
+	public static final int DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS = 3;
+
+	// - Defaults for required ZooKeeper configuration keys -------------------
+
+	/** ZooKeeper default client port. */
+	public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
+
+	/** ZooKeeper default init limit. */
+	public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
+
+	/** ZooKeeper default sync limit. */
+	public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
+
+	/** ZooKeeper default peer port. */
+	public static final int DEFAULT_ZOOKEEPER_PEER_PORT = 2888;
+
+	/** ZooKeeper default leader port. */
+	public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
+
 	/**
 	 * Not instantiable.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 341144c..99cfa86 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -94,6 +94,8 @@ KEY_ENV_LOG_MAX="env.log.max"
 KEY_ENV_JAVA_HOME="env.java.home"
 KEY_ENV_JAVA_OPTS="env.java.opts"
 KEY_ENV_SSH_OPTS="env.ssh.opts"
+KEY_ZK_QUORUM="ha.zookeeper.quorum"
+KEY_ZK_HEAP_MB="zookeeper.heap.mb"
 
 ########################################################################################################################
 # PATHS AND CONFIG
@@ -196,10 +198,21 @@ if [ -z "${FLINK_SSH_OPTS}" ]; then
     FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}")
 fi
 
+# Define ZK_HEAP if it is not already set
+if [ -z "${ZK_HEAP}" ]; then
+    ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
+fi
+
+if [ -z "${ZK_QUORUM}" ]; then
+    ZK_QUORUM=$(readFromConfig ${KEY_ZK_QUORUM} "" "${YAML_CONF}")
+fi
+
 # Arguments for the JVM. Used for job and task manager JVMs.
 # DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
 # KEY_JOBM_HEAP_MB and KEY_TASKM_HEAP_MB for that!
-JVM_ARGS=""
+if [ -z "${JVM_ARGS}" ]; then
+    JVM_ARGS=""
+fi
 
 # Check if deprecated HADOOP_HOME is set.
 if [ -n "$HADOOP_HOME" ]; then
@@ -243,3 +256,43 @@ rotateLogFile() {
         mv "$log" "$log.$num";
     fi
 }
+
+readMasters() {
+    MASTERS_FILE="${FLINK_CONF_DIR}/masters"
+
+    if [[ ! -f "${MASTERS_FILE}" ]]; then
+        echo "No masters file. Please specify masters in 'conf/masters'."
+        exit 1
+    fi
+
+    MASTERS=()
+
+    GOON=true
+    while $GOON; do
+        read line || GOON=false
+        HOST=$( extractHostName $line)
+        if [ -n "$HOST" ]; then
+            MASTERS+=(${HOST})
+        fi
+    done < "$MASTERS_FILE"
+}
+
+readSlaves() {
+    SLAVES_FILE="${FLINK_CONF_DIR}/slaves"
+
+    if [[ ! -f "$SLAVES_FILE" ]]; then
+        echo "No slaves file. Please specify slaves in 'conf/slaves'."
+        exit 1
+    fi
+
+    SLAVES=()
+
+    GOON=true
+    while $GOON; do
+        read line || GOON=false
+        HOST=$( extractHostName $line)
+        if [ -n "$HOST" ]; then
+            SLAVES+=(${HOST})
+        fi
+    done < "$SLAVES_FILE"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
new file mode 100644
index 0000000..1fc110a
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
@@ -0,0 +1,146 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start/stop a Flink daemon.
+USAGE="Usage: flink-daemon.sh (start|stop|stop-all) (jobmanager|taskmanager|zookeeper) [args]"
+
+STARTSTOP=$1
+DAEMON=$2
+ARGS=$3
+
+case $DAEMON in
+    (jobmanager)
+        CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager
+    ;;
+
+    (taskmanager)
+        CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager
+    ;;
+
+    (zookeeper)
+        CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
+    ;;
+
+    (*)
+        echo "Unknown daemon '${DAEMON}'. $USAGE."
+        exit 1
+    ;;
+esac
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+    FLINK_IDENT_STRING="$USER"
+fi
+
+FLINK_TM_CLASSPATH=`constructFlinkClassPath`
+
+pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$DAEMON.pid
+
+mkdir -p "$FLINK_PID_DIR"
+
+# Ascending ID depending on number of lines in pid file.
+# This allows us to start multiple daemon of each type.
+id=$([ -f "$pid" ] && echo $(wc -l < $pid) || echo "0")
+
+log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-$DAEMON-$id-$HOSTNAME.log
+out=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-$DAEMON-$id-$HOSTNAME.out
+
+log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
+
+JAVA_VERSION=$($JAVA_RUN -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
+
+# Only set JVM 8 arguments if we have correctly extracted the version
+if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
+    if [ "$JAVA_VERSION" -lt 18 ]; then
+        JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
+    fi
+fi
+
+case $STARTSTOP in
+
+    (start)
+        # Rotate log files
+        rotateLogFile $log
+        rotateLogFile $out
+
+        echo "Starting $DAEMON daemon on host $HOSTNAME."
+        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} ${ARGS} > "$out" 2>&1 < /dev/null &
+        mypid=$!
+
+        # Add to pid file if successful start
+        if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; then
+            echo $mypid >> $pid
+        else
+            echo "Error starting $DAEMON daemon."
+            exit 1
+        fi
+    ;;
+
+    (stop)
+        if [ -f $pid ]; then
+            # Remove last in pid file
+            to_stop=$(tail -n 1 $pid)
+
+            if [ -z $to_stop ]; then
+                rm $pid # If all stopped, clean up pid file
+                echo "No $DAEMON daemon to stop on host $HOSTNAME."
+            else
+                sed \$d $pid > $pid.tmp # all but last line
+
+                # If all stopped, clean up pid file
+                [ $(wc -l < $pid.tmp) -eq 0 ] && rm $pid $pid.tmp || mv $pid.tmp $pid
+
+                if kill -0 $to_stop > /dev/null 2>&1; then
+                    echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."
+                    kill $to_stop
+                else
+                    echo "No $DAEMON daemon (pid: $to_stop) is running anymore on $HOSTNAME."
+                fi
+            fi
+        else
+            echo "No $DAEMON daemon to stop on host $HOSTNAME."
+        fi
+    ;;
+
+    (stop-all)
+        if [ -f $pid ]; then
+            mv $pid ${pid}.tmp
+
+            while read to_stop; do
+                if kill -0 $to_stop > /dev/null 2>&1; then
+                    echo "Stopping $DAEMON daemon (pid: $to_stop) on host $HOSTNAME."
+                    kill $to_stop
+                else
+                    echo "Skipping $DAEMON daemon (pid: $to_stop), because it is not running anymore on $HOSTNAME."
+                fi
+            done < ${pid}.tmp
+            rm ${pid}.tmp
+        fi
+    ;;
+
+    (*)
+        echo "Unexpected argument '$STARTSTOP'. $USAGE."
+        exit 1
+    ;;
+
+esac

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index afe35d8..450d36b 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -17,104 +17,54 @@
 # limitations under the License.
 ################################################################################
 
+# Start/stop a Flink JobManager.
+USAGE="Usage: jobmanager.sh (start (local|cluster) [batch|streaming] [host])|stop|stop-all)"
 
 STARTSTOP=$1
 EXECUTIONMODE=$2
 STREAMINGMODE=$3
+HOST=$4 # optional when starting multiple instances
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
 . "$bin"/config.sh
 
-JAVA_VERSION=$($JAVA_RUN -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
-
-# Only set JVM 8 arguments if we have correctly extracted the version
-if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
-    if [ "$JAVA_VERSION" -lt 18 ]; then
-        JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
+if [[ $STARTSTOP == "start" ]]; then
+    if [ -z $EXECUTIONMODE ]; then
+        echo "Missing execution mode (local|cluster) argument. $USAGE."
+        exit 1
     fi
-fi
-
-if [ "$FLINK_IDENT_STRING" = "" ]; then
-    FLINK_IDENT_STRING="$USER"
-fi
-
-FLINK_JM_CLASSPATH=`constructFlinkClassPath`
-
-log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-jobmanager-$HOSTNAME.log
-out=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-jobmanager-$HOSTNAME.out
-pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-jobmanager.pid
-log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
-
-case $STARTSTOP in
-
-    (start)
 
-        if [ -z $EXECUTIONMODE ]; then
-            echo "Please specify 'start (cluster|local) [batch|streaming]' or 'stop'"
-            exit 1
-        fi
-
-        # Use batch mode as default
-        if [ -z $STREAMINGMODE ]; then
-            echo "Did not specify [batch|streaming] mode. Falling back to batch mode as default."
-            STREAMINGMODE="batch"
-        fi
+    # Use batch mode as default
+    if [ -z $STREAMINGMODE ]; then
+        echo "Missing streaming mode (batch|streaming) argument. Using 'batch'."
+        STREAMINGMODE="batch"
+    fi
 
-        if [[ ! ${FLINK_JM_HEAP} =~ $IS_NUMBER ]]; then
-            echo "ERROR: Configured job manager heap size is not a number. Cancelling job manager startup."
+    if [[ ! ${FLINK_JM_HEAP} =~ $IS_NUMBER ]]; then
+        echo "[ERROR] Configured JobManager JVM heap size is not a number. Please set '$KEY_JOBM_HEAP_MB' in $FLINK_CONF_FILE."
+        exit 1
+    fi
 
+    if [ "$EXECUTIONMODE" = "local" ]; then
+        if [[ ! ${FLINK_TM_HEAP} =~ $IS_NUMBER ]]; then
+            echo "[ERROR] Configured JobManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
             exit 1
         fi
 
-        if [ "$EXECUTIONMODE" = "local" ]; then
-            if [[ ! ${FLINK_TM_HEAP} =~ $IS_NUMBER ]]; then
-                echo "ERROR: Configured task manager heap size is not a number. Cancelling (local) job manager startup."
-
-                exit 1
-            fi
-
-            FLINK_JM_HEAP=`expr $FLINK_JM_HEAP + $FLINK_TM_HEAP`
-        fi
-
-        if [ "$FLINK_JM_HEAP" -gt 0 ]; then
-            JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP"m -Xmx"$FLINK_JM_HEAP"m"
-        fi
-
-        mkdir -p "$FLINK_PID_DIR"
-        if [ -f $pid ]; then
-            if kill -0 `cat $pid` > /dev/null 2>&1; then
-                echo Job manager running as process `cat $pid`.  Stop it first.
-                exit 1
-            fi
-        fi
-
-        # Rotate log files
-        rotateLogFile $log
-        rotateLogFile $out
-
-        echo "Starting Job Manager"
-        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_JM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.jobmanager.JobManager --configDir "$FLINK_CONF_DIR" --executionMode $EXECUTIONMODE --streamingMode "$STREAMINGMODE" > "$out" 2>&1 < /dev/null &
-        echo $! > $pid
-
-    ;;
+        FLINK_JM_HEAP=`expr $FLINK_JM_HEAP + $FLINK_TM_HEAP`
+    fi
 
-    (stop)
-        if [ -f $pid ]; then
-            if kill -0 `cat $pid` > /dev/null 2>&1; then
-                echo "Stopping job manager"
-                kill `cat $pid`
-            else
-                echo "No job manager to stop"
-            fi
-        else
-            echo "No job manager to stop"
-        fi
-    ;;
+    if [ "$FLINK_JM_HEAP" -gt 0 ]; then
+        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP"m -Xmx"$FLINK_JM_HEAP"m"
+    fi
 
-    (*)
-        echo "Please specify 'start (cluster|local) [batch|streaming]' or 'stop'"
-    ;;
+    # Startup parameters
+    args="--configDir ${FLINK_CONF_DIR} --executionMode ${EXECUTIONMODE} --streamingMode ${STREAMINGMODE}"
+    if [ ! -z $HOST ]; then
+        args="${args} --host $HOST"
+    fi
+fi
 
-esac
+${bin}/flink-daemon.sh $STARTSTOP jobmanager "${args}"

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh b/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
index da13029..d48a6e3fb 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
@@ -17,32 +17,8 @@
 # limitations under the License.
 ################################################################################
 
-
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
-. "$bin"/config.sh
-
-HOSTLIST=$FLINK_SLAVES
-
-if [ "$HOSTLIST" = "" ]; then
-    HOSTLIST="${FLINK_CONF_DIR}/slaves"
-fi
-
-if [ ! -f "$HOSTLIST" ]; then
-    echo $HOSTLIST is not a valid slave list
-    exit 1
-fi
-
-# cluster mode, bring up job manager locally and a task manager on every slave host
-"$FLINK_BIN_DIR"/jobmanager.sh start cluster streaming
-
-GOON=true
-while $GOON
-do
-    read line || GOON=false
-    HOST=$( extractHostName $line)
-    if [ -n "$HOST" ]; then
-        ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start streaming &"
-    fi
-done < "$HOSTLIST"
+# Start a Flink cluster in streaming mode
+${bin}/start-cluster.sh streaming

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index 36f5bc3..2183841 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -17,32 +17,40 @@
 # limitations under the License.
 ################################################################################
 
+# Start a Flink cluster in batch or streaming mode
+USAGE="Usage: start-cluster.sh [batch|streaming]"
+
+STREAMING_MODE=$1
+
+if [[ -z $STREAMING_MODE ]]; then
+    STREAMING_MODE="batch"
+fi
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
 . "$bin"/config.sh
 
-HOSTLIST=$FLINK_SLAVES
+# Start the JobManager instance(s)
+if [[ -z $ZK_QUORUM ]]; then
+    echo "Starting cluster (${STREAMING_MODE} mode)."
 
-if [ "$HOSTLIST" = "" ]; then
-    HOSTLIST="${FLINK_CONF_DIR}/slaves"
-fi
+    # Start single JobManager on this machine
+    "$bin"/jobmanager.sh start cluster ${STREAMING_MODE}
+else
+    # HA Mode
+    readMasters
+
+    echo "Starting HA cluster (${STREAMING_MODE} mode) with ${#MASTERS[@]} masters and ${#ZK_QUORUM[@]} peers in ZooKeeper quorum."
 
-if [ ! -f "$HOSTLIST" ]; then
-    echo $HOSTLIST is not a valid slave list
-    exit 1
+    for master in ${MASTERS[@]}; do
+        ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l $FLINK_BIN_DIR/jobmanager.sh start cluster ${STREAMING_MODE} ${master} &"
+    done
 fi
 
-# cluster mode, bring up job manager locally and a task manager on every slave host
-"$FLINK_BIN_DIR"/jobmanager.sh start cluster batch
-
-GOON=true
-while $GOON
-do
-    read line || GOON=false
-    HOST=$( extractHostName $line)
-    if [ -n "$HOST" ]; then
-        ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start batch &"
-    fi
-done < "$HOSTLIST"
+# Start TaskManager instance(s)
+readSlaves
+
+for slave in ${SLAVES[@]}; do
+    ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l $bin/taskmanager.sh start ${STREAMING_MODE} &"
+done

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh b/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
index 2cb4d4a..688bffa 100755
--- a/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
@@ -17,11 +17,8 @@
 # limitations under the License.
 ################################################################################
 
-
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
-. "$bin"/config.sh
-
-# local mode, only bring up job manager. The job manager will start an internal task manager
-"$FLINK_BIN_DIR"/jobmanager.sh start local streaming
+# Start a local Flink cluster in streaming mode
+${bin}/start-local.sh streaming

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/start-local.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.sh b/flink-dist/src/main/flink-bin/bin/start-local.sh
index 7ea3ff4..626fa69 100755
--- a/flink-dist/src/main/flink-bin/bin/start-local.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-local.sh
@@ -17,6 +17,11 @@
 # limitations under the License.
 ################################################################################
 
+STREAMING_MODE=$1
+
+if [[ -z $STREAMING_MODE ]]; then
+	STREAMING_MODE="batch"
+fi
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
@@ -24,4 +29,4 @@ bin=`cd "$bin"; pwd`
 . "$bin"/config.sh
 
 # local mode, only bring up job manager. The job manager will start an internal task manager
-"$FLINK_BIN_DIR"/jobmanager.sh start local batch
+"$FLINK_BIN_DIR"/jobmanager.sh start local ${STREAMING_MODE}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/start-zookeeper-quorum.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-zookeeper-quorum.sh b/flink-dist/src/main/flink-bin/bin/start-zookeeper-quorum.sh
new file mode 100755
index 0000000..d2ff914
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/start-zookeeper-quorum.sh
@@ -0,0 +1,46 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# Starts a ZooKeeper quorum as configured in $FLINK_CONF/zoo.cfg
+
+ZK_CONF=$FLINK_CONF_DIR/zoo.cfg
+if [ ! -f $ZK_CONF ]; then
+    echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'."
+    exit 1
+fi
+
+# Extract server.X from ZooKeeper config and start instances
+while read server ; do
+    server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
+
+    # match server.id=address[:port[:port]]
+    if [[ $server =~ ^server\.([0-9])+[[:space:]]*\=([^: \#]+) ]]; then
+        id=${BASH_REMATCH[1]}
+        address=${BASH_REMATCH[2]}
+
+        ssh -n $FLINK_SSH_OPTS $address -- "nohup /bin/bash -l $bin/zookeeper.sh start $id &"
+    else
+        echo "[WARN] Parse error. Skipping config entry '$server'."
+    fi
+done < <(grep "^server\." $ZK_CONF)

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
index cf2bf9b..17a5daf 100755
--- a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
@@ -17,33 +17,26 @@
 # limitations under the License.
 ################################################################################
 
-
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
 . "$bin"/config.sh
 
-HOSTLIST=$FLINK_SLAVES
-
-if [ "$HOSTLIST" = "" ]; then
-    HOSTLIST="${FLINK_CONF_DIR}/slaves"
-fi
-
-if [ ! -f "$HOSTLIST" ]; then
-    echo $HOSTLIST is not a valid slave list
-    exit 1
-fi
+# Stop TaskManager instance(s)
+readSlaves
 
+for slave in ${SLAVES[@]}; do
+    ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l $bin/taskmanager.sh stop &"
+done
 
-GOON=true
-while $GOON
-do
-    read line || GOON=false
-    HOST=$( extractHostName $line)
-    if [ -n "$HOST" ]; then
-        ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash $FLINK_BIN_DIR/taskmanager.sh stop &"
-    fi
-done < $HOSTLIST
+# Stop JobManager instance(s)
+if [[ -z $ZK_QUORUM ]]; then
+    "$bin"/jobmanager.sh stop
+else
+	# HA Mode
+    readMasters
 
-# cluster mode, stop the job manager locally and stop the task manager on every slave host
-"$FLINK_BIN_DIR"/jobmanager.sh stop
+    for master in ${MASTERS[@]}; do
+        ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l $bin/jobmanager.sh stop &"
+    done
+fi

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/stop-zookeeper-quorum.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-zookeeper-quorum.sh b/flink-dist/src/main/flink-bin/bin/stop-zookeeper-quorum.sh
new file mode 100755
index 0000000..4d19cbd
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/stop-zookeeper-quorum.sh
@@ -0,0 +1,46 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# Stops a ZooKeeper quorum as configured in $FLINK_CONF/zoo.cfg
+
+ZK_CONF=$FLINK_CONF_DIR/zoo.cfg
+if [ ! -f $ZK_CONF ]; then
+    echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'."
+    exit 1
+fi
+
+# Extract server.X from ZooKeeper config and stop instances
+while read server ; do
+    server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
+
+    # match server.id=address[:port[:port]]
+    if [[ $server =~ ^server\.([0-9])+[[:space:]]*\=([^: \#]+) ]]; then
+        id=${BASH_REMATCH[1]}
+        server=${BASH_REMATCH[2]}
+
+        ssh -n $FLINK_SSH_OPTS $server -- "nohup /bin/bash -l $bin/zookeeper.sh stop &"
+    else
+        echo "[WARN] Parse error. Skipping config entry '$server'."
+    fi
+done < <(grep "^server\." $ZK_CONF)

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index a35ad77..1920bd6 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -17,6 +17,8 @@
 # limitations under the License.
 ################################################################################
 
+# Start/stop a Flink JobManager.
+USAGE="Usage: taskmanager.sh (start [batch|streaming])|stop|stop-all)"
 
 STARTSTOP=$1
 STREAMINGMODE=$2
@@ -26,78 +28,24 @@ bin=`cd "$bin"; pwd`
 
 . "$bin"/config.sh
 
-if [ "$FLINK_IDENT_STRING" = "" ]; then
-    FLINK_IDENT_STRING="$USER"
-fi
-
-FLINK_TM_CLASSPATH=`constructFlinkClassPath`
-
-log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-taskmanager-$HOSTNAME.log
-out=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-taskmanager-$HOSTNAME.out
-pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-taskmanager.pid
-log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
-
-JAVA_VERSION=$($JAVA_RUN -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
-
-# Only set JVM 8 arguments if we have correctly extracted the version
-if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
-    if [ "$JAVA_VERSION" -lt 18 ]; then
-        JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
+if [[ $STARTSTOP == "start" ]]; then
+    # Use batch mode as default
+    if [ -z $STREAMINGMODE ]; then
+        echo "Missing streaming mode (batch|streaming). Using 'batch'."
+        STREAMINGMODE="batch"
     fi
-fi
-
-case $STARTSTOP in
-
-    (start)
-
-        # Use batch mode as default
-        if [ -z $STREAMINGMODE ]; then
-            echo "Did not specify [batch|streaming] mode. Falling back to batch mode as default."
-            STREAMINGMODE="batch"
-        fi
 
-        if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-            echo "ERROR: Configured task manager heap size is not a number. Cancelling task manager startup."
-            exit 1
-        fi
-
-        if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-            JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m"
-        fi
-
-        mkdir -p "$FLINK_PID_DIR"
-        if [ -f $pid ]; then
-            if kill -0 `cat $pid` > /dev/null 2>&1; then
-                echo Task manager running as process `cat $pid` on host $HOSTNAME.  Stop it first.
-                exit 1
-            fi
-        fi
-
-        # Rotate log files
-        rotateLogFile $log
-        rotateLogFile $out
-
-        echo Starting task manager on host $HOSTNAME
-        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.taskmanager.TaskManager --configDir "$FLINK_CONF_DIR" --streamingMode "$STREAMINGMODE" > "$out" 2>&1 < /dev/null &
-        echo $! > $pid
-
-    ;;
+    if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
+        echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+        exit 1
+    fi
 
-    (stop)
-        if [ -f $pid ]; then
-            if kill -0 `cat $pid` > /dev/null 2>&1; then
-                echo Stopping task manager on host $HOSTNAME
-                kill `cat $pid`
-            else
-                echo No task manager to stop on host $HOSTNAME
-            fi
-        else
-            echo No task manager to stop on host $HOSTNAME
-        fi
-    ;;
+    if [ "$FLINK_TM_HEAP" -gt 0 ]; then
+        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m"
+    fi
 
-    (*)
-        echo "Please specify 'start [batch|streaming]' or 'stop'"
-    ;;
+    # Startup parameters
+    args="--configDir ${FLINK_CONF_DIR} --streamingMode ${STREAMINGMODE}"
+fi
 
-esac
+${bin}/flink-daemon.sh $STARTSTOP taskmanager "${args}"

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/zookeeper.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/zookeeper.sh b/flink-dist/src/main/flink-bin/bin/zookeeper.sh
new file mode 100755
index 0000000..0bbf6d5
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/zookeeper.sh
@@ -0,0 +1,56 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start/stop a ZooKeeper quorum peer.
+USAGE="Usage: zookeeper.sh (start peer-id|stop|stop-all)"
+
+STARTSTOP=$1
+PEER_ID=$2
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+ZK_CONF=$FLINK_CONF_DIR/zoo.cfg
+if [ ! -f $ZK_CONF ]; then
+    echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'."
+    exit 1
+fi
+
+if [[ $STARTSTOP == "start" ]]; then
+    if [ -z $PEER_ID ]; then
+        echo "[ERROR] Missing peer id argument. $USAGE."
+        exit 1
+    fi
+
+    if [[ ! ${ZK_HEAP} =~ ${IS_NUMBER} ]]; then
+        echo "[ERROR] Configured ZooKeeper JVM heap size is not a number. Please set '$KEY_ZK_HEAP_MB' in $FLINK_CONF_FILE."
+        exit 1
+    fi
+
+    if [ "$ZK_HEAP" -gt 0 ]; then
+        export JVM_ARGS="$JVM_ARGS -Xms"$ZK_HEAP"m -Xmx"$ZK_HEAP"m"
+    fi
+
+    # Startup parameters
+    args="--zkConfigFile $ZK_CONF --peerId $PEER_ID"
+fi
+
+${bin}/flink-daemon.sh $STARTSTOP zookeeper "${args}"

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/conf/masters
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/masters b/flink-dist/src/main/flink-bin/conf/masters
new file mode 100644
index 0000000..2fbb50c
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/conf/masters
@@ -0,0 +1 @@
+localhost

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/conf/zoo.cfg
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/zoo.cfg b/flink-dist/src/main/flink-bin/conf/zoo.cfg
new file mode 100644
index 0000000..a14ec66
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/conf/zoo.cfg
@@ -0,0 +1,18 @@
+# The number of milliseconds of each tick
+tickTime=2000
+
+# The number of ticks that the initial  synchronization phase can take
+initLimit=10
+
+# The number of ticks that can pass between  sending a request and getting an acknowledgement
+syncLimit=5
+
+# The directory where the snapshot is stored.
+# dataDir=/tmp/zookeeper
+
+# The port at which the clients will connect
+clientPort=2181
+
+# ZooKeeper quorum peers
+server.1=localhost:2888:3888
+# server.2=host:peer-port:leader-port

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index f92fb6e..62e85c1 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -34,9 +34,9 @@ under the License.
 
 	<packaging>jar</packaging>
 
-    <properties>
-        <metrics.version>3.1.0</metrics.version>
-    </properties>
+	<properties>
+		<metrics.version>3.1.0</metrics.version>
+	</properties>
 
 	<dependencies>
 		<dependency>
@@ -168,23 +168,32 @@ under the License.
 			</exclusions>
 		</dependency>
 
-        <dependency>
-            <groupId>io.dropwizard.metrics</groupId>
-            <artifactId>metrics-core</artifactId>
-            <version>${metrics.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>io.dropwizard.metrics</groupId>
-            <artifactId>metrics-jvm</artifactId>
-            <version>${metrics.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>io.dropwizard.metrics</groupId>
-            <artifactId>metrics-json</artifactId>
-            <version>${metrics.version}</version>
-        </dependency>
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-jvm</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-json</artifactId>
+			<version>${metrics.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.zookeeper</groupId>
+			<artifactId>zookeeper</artifactId>
+		</dependency>
 
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-recipes</artifactId>
+		</dependency>
 	</dependencies>
 
 	<build>
@@ -196,7 +205,7 @@ under the License.
 				<version>3.1.4</version>
 				<executions>
 					<!-- Run scala compiler in the process-resources phase, so that dependencies on
-					    scala classes can be resolved later in the (Java) compile phase -->
+						scala classes can be resolved later in the (Java) compile phase -->
 					<execution>
 						<id>scala-compile-first</id>
 						<phase>process-resources</phase>
@@ -206,7 +215,7 @@ under the License.
 					</execution>
 
 					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
-					     scala classes can be resolved later in the (Java) test-compile phase -->
+						 scala classes can be resolved later in the (Java) test-compile phase -->
 					<execution>
 						<id>scala-test-compile</id>
 						<phase>process-test-resources</phase>

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
index 988e3a7..edfa87b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.runtime.StreamingMode;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * The command line parameters passed to the TaskManager.
  */
@@ -31,6 +33,8 @@ public class JobManagerCliOptions {
 	
 	private StreamingMode streamingMode = StreamingMode.BATCH_ONLY;
 
+	private String host;
+
 	// ------------------------------------------------------------------------
 
 	public String getConfigDir() {
@@ -74,4 +78,12 @@ public class JobManagerCliOptions {
 					"Unknown streaming mode. Streaming mode must be one of 'BATCH' or 'STREAMING'.");
 		}
 	}
+
+	public String getHost() {
+		return host;
+	}
+
+	public void setHost(String host) {
+		this.host = checkNotNull(host);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtil.java
new file mode 100644
index 0000000..c3d9df4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtil.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper utilities.
+ */
+public class ZooKeeperUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperUtil.class);
+
+	public static CuratorFramework createCuratorFramework(Configuration configuration) throws Exception {
+		String zkQuorum = ZooKeeperUtil.getZooKeeperEnsemble(configuration);
+
+		if (zkQuorum == null || zkQuorum.equals("")) {
+			throw new RuntimeException("No valid ZooKeeper quorum has been specified.");
+		}
+
+		int sessionTimeout = configuration.getInteger(
+				ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT,
+				ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
+
+		int connectionTimeout = configuration.getInteger(
+				ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
+				ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT);
+
+		int retryWait = configuration.getInteger(
+				ConfigConstants.ZOOKEEPER_RETRY_WAIT,
+				ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
+
+		int maxRetryAttempts = configuration.getInteger(
+				ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS,
+				ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS);
+
+		String root = configuration.getString(ConfigConstants.ZOOKEEPER_DIR_KEY,
+				ConfigConstants.DEFAULT_ZOOKEEPER_ZNODE_ROOT);
+
+		LOG.info("Using '{}' as root namespace.", root);
+
+		CuratorFramework cf = CuratorFrameworkFactory.builder()
+				.connectString(zkQuorum)
+				.sessionTimeoutMs(sessionTimeout)
+				.connectionTimeoutMs(connectionTimeout)
+				.retryPolicy(new ExponentialBackoffRetry(retryWait, maxRetryAttempts))
+				// Curator prepends a '/' manually and throws an Exception if the
+				// namespace starts with a '/'.
+				.namespace(root.startsWith("/") ? root.substring(1) : root)
+				.build();
+
+		try {
+			cf.start();
+		}
+		catch (Exception e) {
+			throw new Exception("Could not start CuratorFramework.", e);
+		}
+
+		return cf;
+	}
+
+	/**
+	 * Returns whether high availability is enabled (<=> ZooKeeper quorum configured).
+	 */
+	public static boolean isJobManagerHighAvailabilityEnabled(Configuration flinkConf) {
+		return flinkConf.containsKey(ConfigConstants.ZOOKEEPER_QUORUM_KEY);
+	}
+
+	/**
+	 * Returns the configured ZooKeeper quorum (and removes whitespace, because ZooKeeper does not
+	 * tolerate it).
+	 */
+	public static String getZooKeeperEnsemble(Configuration flinkConf)
+			throws IllegalConfigurationException {
+
+		String zkQuorum = flinkConf.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, null);
+
+		if (zkQuorum == null || zkQuorum.equals("")) {
+			throw new IllegalConfigurationException("No ZooKeeper quorum specified in config.");
+		}
+
+		// Remove all whitespace
+		zkQuorum = zkQuorum.replaceAll("\\s+", "");
+
+		return zkQuorum;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
new file mode 100644
index 0000000..d0e706e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * Simple wrapper for ZooKeeper's {@link QuorumPeer}, which reads a ZooKeeper config file and writes
+ * the required 'myid' file before starting the peer.
+ */
+public class FlinkZooKeeperQuorumPeer {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class);
+
+	public static void main(String[] args) {
+		try {
+			final ParameterTool params = ParameterTool.fromArgs(args);
+			final String zkConfigFile = params.getRequired("zkConfigFile");
+			final int peerId = params.getInt("peerId");
+
+			// Run quorum peer
+			runFlinkZkQuorumPeer(zkConfigFile, peerId);
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			System.exit(-1);
+		}
+	}
+
+	/**
+	 * Runs a ZooKeeper {@link QuorumPeer} if further peers are configured or a single
+	 * {@link ZooKeeperServer} if no further peers are configured.
+	 *
+	 * @param zkConfigFile ZooKeeper config file 'zoo.cfg'
+	 * @param peerId       ID for the 'myid' file
+	 */
+	public static void runFlinkZkQuorumPeer(String zkConfigFile, int peerId) throws Exception {
+
+		Properties zkProps = new Properties();
+
+		InputStream inStream = new FileInputStream(new File(zkConfigFile));
+		zkProps.load(inStream);
+
+		LOG.info("Configuration: " + zkProps);
+
+		// Set defaults for required properties
+		setRequiredProperties(zkProps);
+
+		// Write peer id to myid file
+		writeMyIdToDataDir(zkProps, peerId);
+
+		// The myid file needs to be written before creating the instance. Otherwise, this
+		// will fail.
+		QuorumPeerConfig conf = new QuorumPeerConfig();
+		conf.parseProperties(zkProps);
+
+		if (conf.isDistributed()) {
+			// Run quorum peer
+			LOG.info("Running distributed ZooKeeper quorum peer (total peers: {}).",
+					conf.getServers().size());
+
+			QuorumPeerMain qp = new QuorumPeerMain();
+			qp.runFromConfig(conf);
+		}
+		else {
+			// Run standalone
+			LOG.info("Running standalone ZooKeeper quorum peer.");
+
+			ZooKeeperServerMain zk = new ZooKeeperServerMain();
+			ServerConfig sc = new ServerConfig();
+			sc.readFrom(conf);
+			zk.runFromConfig(sc);
+		}
+	}
+
+	/**
+	 * Sets required properties to reasonable defaults and logs it.
+	 */
+	private static void setRequiredProperties(Properties zkProps) {
+		// Set default client port
+		if (zkProps.getProperty("clientPort") == null) {
+			int clientPort = ConfigConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
+			zkProps.setProperty("clientPort", String.valueOf(clientPort));
+
+			LOG.warn("No 'clientPort' configured. Set to '{}'.", clientPort);
+		}
+
+		// Set default init limit
+		if (zkProps.getProperty("initLimit") == null) {
+			int initLimit = ConfigConstants.DEFAULT_ZOOKEEPER_INIT_LIMIT;
+			zkProps.setProperty("initLimit", String.valueOf(initLimit));
+
+			LOG.warn("No 'initLimit' configured. Set to '{}'.", initLimit);
+		}
+
+		// Set default sync limit
+		if (zkProps.getProperty("syncLimit") == null) {
+			int syncLimit = ConfigConstants.DEFAULT_ZOOKEEPER_SYNC_LIMIT;
+			zkProps.setProperty("syncLimit", String.valueOf(syncLimit));
+
+			LOG.warn("No 'syncLimit' configured. Set to '{}'.", syncLimit);
+		}
+
+		// Set default data dir
+		if (zkProps.getProperty("dataDir") == null) {
+			String dataDir = String.format("%s/%s/zookeeper",
+					System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
+
+			zkProps.setProperty("dataDir", dataDir);
+
+			LOG.warn("No 'dataDir' configured. Set to '{}'.", dataDir);
+		}
+
+		int peerPort = ConfigConstants.DEFAULT_ZOOKEEPER_PEER_PORT;
+		int leaderPort = ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PORT;
+
+		// Set peer and leader ports if none given, because ZooKeeper complains if multiple
+		// servers are configured, but no ports are given.
+		for (Map.Entry<Object, Object> entry : zkProps.entrySet()) {
+			String key = (String) entry.getKey();
+
+			if (entry.getKey().toString().startsWith("server.")) {
+				String value = (String) entry.getValue();
+				String[] parts = value.split(":");
+
+				if (parts.length == 1) {
+					String address = String.format("%s:%d:%d", parts[0], peerPort, leaderPort);
+					zkProps.setProperty(key, address);
+					LOG.info("Set peer and leader port of '{}': '{}' => '{}'.",
+							key, value, address);
+				}
+				else if (parts.length == 2) {
+					String address = String.format("%s:%d:%d",
+							parts[0], Integer.valueOf(parts[1]), leaderPort);
+					zkProps.setProperty(key, address);
+					LOG.info("Set peer port of '{}': '{}' => '{}'.", key, value, address);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Write 'myid' file to the 'dataDir' in the given ZooKeeper configuration.
+	 *
+	 * <blockquote>
+	 * Every machine that is part of the ZooKeeper ensemble should know about every other machine in
+	 * the ensemble. You accomplish this with the series of lines of the form
+	 * server.id=host:port:port. The parameters host and port are straightforward. You attribute the
+	 * server id to each machine by creating a file named myid, one for each server, which resides
+	 * in that server's data directory, as specified by the configuration file parameter dataDir.
+	 * </blockquote>
+	 *
+	 * @param zkProps ZooKeeper configuration.
+	 * @param id      The ID of this {@link QuorumPeer}.
+	 * @throws IllegalConfigurationException Thrown, if 'dataDir' property not set in given
+	 *                                       ZooKeeper properties.
+	 * @throws IOException                   Thrown, if 'dataDir' does not exist and cannot be
+	 *                                       created.
+	 * @see <a href="http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html">
+	 * ZooKeeper Administrator's Guide</a>
+	 */
+
+	private static void writeMyIdToDataDir(Properties zkProps, int id) throws IOException {
+
+		// Check dataDir and create if necessary
+		if (zkProps.getProperty("dataDir") == null) {
+			throw new IllegalConfigurationException("No dataDir configured.");
+		}
+
+		File dataDir = new File(zkProps.getProperty("dataDir"));
+
+		if (!dataDir.isDirectory() && !dataDir.mkdirs()) {
+			throw new IOException("Cannot create dataDir '" + dataDir + "'.");
+		}
+
+		dataDir.deleteOnExit();
+
+		// Write myid to file
+		PrintWriter writer = null;
+		try {
+			LOG.info("Writing {} to myid file in 'dataDir'.", id);
+
+			writer = new PrintWriter(new File(dataDir, "myid"));
+			writer.println(id);
+		}
+		finally {
+			if (writer != null) {
+				writer.close();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c917d4a..0d96edb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -18,49 +18,49 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import java.io.{IOException, File}
+import java.io.{File, IOException}
 import java.net.InetSocketAddress
 import java.util.Collections
 
-import akka.actor.Status.{Success, Failure}
+import akka.actor.Status.{Failure, Success}
+import akka.actor._
 import grizzled.slf4j.Logger
-import org.apache.flink.api.common.{JobID, ExecutionConfig}
-import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
+import org.apache.flink.api.common.{ExecutionConfig, JobID}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult
+import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.client._
-import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph}
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
+import org.apache.flink.runtime.messages.JobManagerMessages._
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages._
+import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
 import org.apache.flink.runtime.messages.accumulators._
-import org.apache.flink.runtime.messages.checkpoint.{AcknowledgeCheckpoint, AbstractCheckpointMessage}
+import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
+import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
-import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus}
-import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
-import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
-import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
+import org.apache.flink.runtime.util.{EnvironmentInformation, SerializedValue, ZooKeeperUtil}
+import org.apache.flink.runtime.{ActorLogMessages, ActorSynchronousLogging, StreamingMode}
 import org.apache.flink.util.{ExceptionUtils, InstantiationUtil}
 
-import akka.actor._
-
+import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
-import scala.collection.JavaConverters._
 
 /**
  * The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering the
@@ -969,6 +969,13 @@ object JobManager {
       } text {
         "The streaming mode of the JobManager (STREAMING / BATCH)"
       }
+
+      opt[String]("host").optional().action { (arg, conf) =>
+        conf.setHost(arg)
+        conf
+      } text {
+        "Network address for communication with the job manager"
+      }
     }
 
     val config = parser.parse(args, new JobManagerCliOptions()).getOrElse {
@@ -993,9 +1000,34 @@ object JobManager {
       configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
     }
 
-    val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-    val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+    // HA mode
+    val (hostname, port) = if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
+      // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is
+      // chosen.  For the FlinkMiniCluster you have to choose it on your own.
+      LOG.info("HA mode.")
+
+      if (config.getHost == null) {
+        throw new Exception("Missing parameter '--host'.")
+      }
+
+      // Let web server listen on random port
+      configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+
+      (config.getHost, NetUtils.getAvailablePort)
+    }
+    else {
+      if (config.getHost != null) {
+        throw new IllegalStateException("Specified explicit address for JobManager communication " +
+          "via CLI, but no ZooKeeper quorum has been configured. The task managers will not be " +
+          "able to find the correct JobManager to connect to. Please configure ZooKeeper or " +
+          "don't set the address explicitly (this will fallback to the address configured in " +
+          "in 'conf/flink-conf.yaml'.")
+      }
+
+      (configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null),
+        configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT))
+    }
 
     (configuration, config.getJobManagerMode(), config.getStreamingMode(), hostname, port)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 1c60e89..4f95c1c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -61,7 +61,7 @@ import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
-import org.apache.flink.runtime.util.{MathUtils, EnvironmentInformation}
+import org.apache.flink.runtime.util.{ZooKeeperUtil, MathUtils, EnvironmentInformation}
 
 import scala.concurrent._
 import scala.concurrent.duration._
@@ -1435,6 +1435,11 @@ object TaskManager {
     // start the I/O manager last, it will create some temp directories.
     val ioManager: IOManager = new IOManagerAsync(taskManagerConfig.tmpDirPaths)
 
+    if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
+      // TODO @removeme @tillrohrmann Setup leader retrieval service
+      LOG.info("HA mode.")
+    }
+
     // create the actor properties (which define the actor constructor parameters)
     val tmProps = Props(taskManagerClass,
       taskManagerConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
new file mode 100644
index 0000000..da40e15
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ZooKeeperUtilTest {
+
+	@Test
+	public void testZooKeeperEnsembleConnectStringConfiguration() throws Exception {
+		// ZooKeeper does not like whitespace in the quorum connect String.
+		String actual, expected;
+		Configuration conf = new Configuration();
+
+		{
+			expected = "localhost:2891";
+
+			setQuorum(conf, expected);
+			actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+			assertEquals(expected, actual);
+
+			setQuorum(conf, " localhost:2891 "); // with leading and trailing whitespace
+			actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+			assertEquals(expected, actual);
+
+			setQuorum(conf, "localhost :2891"); // whitespace after port
+			actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+			assertEquals(expected, actual);
+		}
+
+		{
+			expected = "localhost:2891,localhost:2891";
+
+			setQuorum(conf, "localhost:2891,localhost:2891");
+			actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+			assertEquals(expected, actual);
+
+			setQuorum(conf, "localhost:2891, localhost:2891");
+			actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+			assertEquals(expected, actual);
+
+			setQuorum(conf, "localhost :2891, localhost:2891");
+			actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+			assertEquals(expected, actual);
+
+			setQuorum(conf, " localhost:2891, localhost:2891 ");
+			actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+			assertEquals(expected, actual);
+		}
+	}
+
+	private Configuration setQuorum(Configuration conf, String quorum) {
+		conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, quorum);
+		return conf;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6336485..1f63ff3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,8 @@ under the License.
 		<chill.version>0.5.2</chill.version>
 		<asm.version>5.0.4</asm.version>
 		<tez.version>0.6.1</tez.version>
+		<zookeeper.version>3.4.6</zookeeper.version>
+		<curatorrecipes.version>2.8.0</curatorrecipes.version>
 	</properties>
 
 	<dependencies>
@@ -322,6 +324,18 @@ under the License.
 					</exclusion>
 				</exclusions>
 			</dependency>
+
+			<dependency>
+				<groupId>org.apache.zookeeper</groupId>
+				<artifactId>zookeeper</artifactId>
+				<version>${zookeeper.version}</version>
+			</dependency>
+
+			<dependency>
+				<groupId>org.apache.curator</groupId>
+				<artifactId>curator-recipes</artifactId>
+				<version>${curatorrecipes.version}</version>
+			</dependency>
 		</dependencies>
 	</dependencyManagement>
 
@@ -730,6 +744,8 @@ under the License.
 						<exclude>flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/**</exclude>
 						<!-- Configuration Files. -->
 						<exclude>**/flink-bin/conf/slaves</exclude>
+						<exclude>**/flink-bin/conf/masters</exclude>
+						<exclude>**/flink-bin/conf/zoo.cfg</exclude>
                         <exclude>flink-contrib/docker-flink/flink/conf/slaves</exclude>
 						<!-- Administrative files in the main trunk. -->
 						<exclude>**/README.md</exclude>


[2/9] flink git commit: [FLINK-2124] [streaming] Fix behavior of FromElementsFunction when T isn't Serializable

Posted by se...@apache.org.
[FLINK-2124] [streaming] Fix behavior of FromElementsFunction when T isn't Serializable

This change rewrites the FromElementsFunction to work with arbitrary types. The elements
are serialized to a byte array (using a TypeSerializer) when the FromElementsFunction is
constructed and deserialized when run is called.

This closes #848


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4a030c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4a030c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4a030c3

Branch: refs/heads/master
Commit: e4a030c307597ff7ebf7aa7cc4435947820db3a0
Parents: 0c49891
Author: Johannes Reifferscheid <j....@gmail.com>
Authored: Wed Jun 17 15:35:37 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java |  6 +-
 .../functions/source/FromElementsFunction.java  | 70 ++++++++++++++++----
 .../flink/streaming/api/SourceFunctionTest.java | 18 +++--
 .../api/scala/StreamExecutionEnvironment.scala  |  4 +-
 4 files changed, 74 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5e7be8d..f98a9f0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -473,7 +473,7 @@ public abstract class StreamExecutionEnvironment {
 
 		TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data[0]);
 
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+		SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
 
 		return addSource(function, "Elements source").returns(typeInfo);
 	}
@@ -504,7 +504,7 @@ public abstract class StreamExecutionEnvironment {
 		}
 
 		TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data.iterator().next());
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+		SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
 		checkCollection(data, typeInfo.getTypeClass());
 
 		return addSource(function, "Collection Source").returns(typeInfo);
@@ -529,7 +529,7 @@ public abstract class StreamExecutionEnvironment {
 			throw new IllegalArgumentException("Collection must not be empty");
 		}
 
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+		SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
 		checkCollection(data, typeInfo.getTypeClass());
 
 		return addSource(function, "Collection Source").returns(typeInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 736cc73..63eb0ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -17,37 +17,81 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import java.util.Arrays;
-import java.util.Collection;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.Iterator;
 
 public class FromElementsFunction<T> implements SourceFunction<T> {
 	
 	private static final long serialVersionUID = 1L;
 
-	private Iterable<T> iterable;
+	private final TypeSerializer<T> serializer;
+	private final byte[] elements;
 
 	private volatile boolean isRunning = true;
 
-	public FromElementsFunction(T... elements) {
-		this.iterable = Arrays.asList(elements);
-	}
+	public FromElementsFunction(TypeSerializer<T> serializer, final T... elements) {
+		this(serializer, new Iterable<T>() {
+			@Override
+			public Iterator<T> iterator() {
+				return new Iterator<T>() {
+					int index = 0;
+
+					@Override
+					public boolean hasNext() {
+						return index < elements.length;
+					}
+
+					@Override
+					public T next() {
+						return elements[index++];
+					}
 
-	public FromElementsFunction(Collection<T> elements) {
-		this.iterable = elements;
+					@Override
+					public void remove() {
+						throw new UnsupportedOperationException();
+					}
+				};
+			}
+		});
 	}
 
-	public FromElementsFunction(Iterable<T> elements) {
-		this.iterable = elements;
+	public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> elements) {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
+
+		try {
+			for (T element : elements) {
+				serializer.serialize(element, wrapper);
+			}
+		} catch (IOException e) {
+			// ByteArrayOutputStream doesn't throw IOExceptions when written to
+		}
+		// closing the DataOutputStream would just flush the ByteArrayOutputStream, which in turn doesn't do anything.
+
+		this.serializer = serializer;
+		this.elements = baos.toByteArray();
 	}
 
 	@Override
 	public void run(SourceContext<T> ctx) throws Exception {
-		Iterator<T> it = iterable.iterator();
+		T value = serializer.createInstance();
+		ByteArrayInputStream bais = new ByteArrayInputStream(elements);
+		DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
 
-		while (isRunning && it.hasNext()) {
-			ctx.collect(it.next());
+		while (isRunning && bais.available() > 0) {
+			value = serializer.deserialize(value, input);
+			ctx.collect(value);
 		}
+		// closing the DataInputStream would just close the ByteArrayInputStream, which doesn't do anything
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
index 7a78205..f0fe63d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
@@ -23,6 +23,8 @@ import static org.mockito.Mockito.mock;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.util.SourceFunctionUtil;
@@ -33,18 +35,22 @@ public class SourceFunctionTest {
 	@Test
 	public void fromElementsTest() throws Exception {
 		List<Integer> expectedList = Arrays.asList(1, 2, 3);
-		List<Integer> actualList = SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>(
-				1,
-				2,
-				3));
+		List<Integer> actualList = SourceFunctionUtil.runSourceFunction(CommonTestUtils.createCopySerializable(
+				new FromElementsFunction<Integer>(
+						IntSerializer.INSTANCE,
+						1,
+						2,
+						3)));
 		assertEquals(expectedList, actualList);
 	}
 
 	@Test
 	public void fromCollectionTest() throws Exception {
 		List<Integer> expectedList = Arrays.asList(1, 2, 3);
-		List<Integer> actualList = SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>(
-				Arrays.asList(1, 2, 3)));
+		List<Integer> actualList = SourceFunctionUtil.runSourceFunction(
+				CommonTestUtils.createCopySerializable(new FromElementsFunction<Integer>(
+						IntSerializer.INSTANCE,
+						Arrays.asList(1, 2, 3))));
 		assertEquals(expectedList, actualList);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 968e07f..7215a4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -284,8 +284,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     require(data != null, "Data must not be null.")
     val typeInfo = implicitly[TypeInformation[T]]
 
-    val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
-      .asJavaCollection(data))
+    val sourceFunction = new FromElementsFunction[T](typeInfo.createSerializer(getConfig),
+      scala.collection.JavaConversions.asJavaCollection(data))
 
     javaEnv.addSource(sourceFunction).returns(typeInfo)
   }


[6/9] flink git commit: [FLINK-2331] Whitelist the 'akka.remote.RemoteTransportExceptionNoStackTrace' to be acceptable in YARN logs

Posted by se...@apache.org.
[FLINK-2331] Whitelist the 'akka.remote.RemoteTransportExceptionNoStackTrace' to be acceptable in YARN logs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c49891f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c49891f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c49891f

Branch: refs/heads/master
Commit: 0c49891fbc2738fc3d308cefeebb6434f84c2592
Parents: 706c391
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 8 19:22:26 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 .../YARNSessionCapacitySchedulerITCase.java     |  2 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  2 +-
 .../org/apache/flink/yarn/YarnTestBase.java     | 29 ++++++++++++++++----
 3 files changed, 25 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c49891f/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 4051ff5..7f14ad5 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -82,6 +82,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 
 	@After
 	public void checkForProhibitedLogContents() {
-		ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS);
+		ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c49891f/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index ac1f971..6f07d36 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -91,7 +91,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 
 	@After
 	public void checkForProhibitedLogContents() {
-		ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS);
+		ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0c49891f/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index b002e8f..43285b5 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -79,12 +79,17 @@ public abstract class YarnTestBase {
 
 	protected final static int NUM_NODEMANAGERS = 2;
 
-	// The tests are scanning for these strings in the final output.
+	/** The tests are scanning for these strings in the final output. */
 	protected final static String[] PROHIBITED_STRINGS = {
 			"Exception", // we don't want any exceptions to happen
 			"Started SelectChannelConnector@0.0.0.0:8081" // Jetty should start on a random port in YARN mode.
 	};
 
+	/** These strings are white-listed, overriding teh prohibited strings */
+	protected final static String[] WHITELISTED_STRINGS = {
+			"akka.remote.RemoteTransportExceptionNoStackTrace"
+	};
+
 	// Temp directory which is deleted after the unit test.
 	@ClassRule
 	public static TemporaryFolder tmp = new TemporaryFolder();
@@ -232,10 +237,11 @@ public abstract class YarnTestBase {
 	 * So always run "mvn clean" before running the tests here.
 	 *
 	 */
-	public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited) {
+	public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final String[] whitelisted) {
 		File cwd = new File("target/"+yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
 		Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to exist", cwd.exists());
 		Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory());
+		
 		File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
 			@Override
 			public boolean accept(File dir, String name) {
@@ -247,10 +253,21 @@ public abstract class YarnTestBase {
 					final String lineFromFile = scanner.nextLine();
 					for (String aProhibited : prohibited) {
 						if (lineFromFile.contains(aProhibited)) {
-							// logging in FATAL to see the actual message in TRAVIS tests.
-							Marker fatal = MarkerFactory.getMarker("FATAL");
-							LOG.error(fatal, "Prohibited String '{}' in line '{}'", aProhibited, lineFromFile);
-							return true;
+							
+							boolean whitelistedFound = false;
+							for (String white : whitelisted) {
+								if (lineFromFile.contains(white)) {
+									whitelistedFound = true;
+									break;
+								}
+							}
+							
+							if (!whitelistedFound) {
+								// logging in FATAL to see the actual message in TRAVIS tests.
+								Marker fatal = MarkerFactory.getMarker("FATAL");
+								LOG.error(fatal, "Prohibited String '{}' in line '{}'", aProhibited, lineFromFile);
+								return true;
+							}
 						}
 					}
 


[5/9] flink git commit: [FLINK-2288] [runtime] Cleanups and comments for ZooKeeper based initialization

Posted by se...@apache.org.
[FLINK-2288] [runtime] Cleanups and comments for ZooKeeper based initialization


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/535475c2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/535475c2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/535475c2

Branch: refs/heads/master
Commit: 535475c2131b9bc352bc7268f022a1bdce206f2e
Parents: 9c0dd97
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 8 15:02:33 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 docs/setup/jobmanager_high_availability.md      |  2 +-
 flink-dist/src/main/resources/flink-conf.yaml   | 37 +++++++++++++
 .../flink/runtime/security/SecurityUtils.java   | 10 +++-
 .../zookeeper/FlinkZooKeeperQuorumPeer.java     | 26 +++++----
 .../flink/runtime/jobmanager/JobManager.scala   | 58 +++++++++++---------
 5 files changed, 92 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/docs/setup/jobmanager_high_availability.md
----------------------------------------------------------------------
diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md
index dec0cdc..8958e17 100644
--- a/docs/setup/jobmanager_high_availability.md
+++ b/docs/setup/jobmanager_high_availability.md
@@ -77,7 +77,7 @@ server.X=addressX:peerPort:leaderPort
 server.Y=addressY:peerPort:leaderPort
 </pre>
 
-The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some rqeuired configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation.
+The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some required configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation.
 
 ## Example: Start and stop a local HA-cluster with 2 JobManagers
 

http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index a258815..2a287dc 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -21,18 +21,38 @@
 # Common
 #==============================================================================
 
+# The host on which the JobManager runs. Only used in non-high-availability mode.
+# The JobManager process will use this hostname to bind the listening servers to.
+# The TaskManagers will try to connect to the JobManager on that host.
+
 jobmanager.rpc.address: localhost
 
+
+# The port where the JobManager's main actor system listens for messages.
+
 jobmanager.rpc.port: 6123
 
+
+# The heap size for the JobManager JVM
+
 jobmanager.heap.mb: 256
 
+
+# The heap size for the TaskManager JVM
+
 taskmanager.heap.mb: 512
 
+
+# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
+
 taskmanager.numberOfTaskSlots: 1
 
+
+# The parallelism used for programs that did not specify and other parallelism.
+
 parallelism.default: 1
 
+
 #==============================================================================
 # Web Frontend
 #==============================================================================
@@ -42,11 +62,13 @@ parallelism.default: 1
 
 jobmanager.web.port: 8081
 
+
 # The port uder which the standalone web client
 # (for job upload and submit) listens.
 
 webclient.port: 8080
 
+
 #==============================================================================
 # Streaming state checkpointing
 #==============================================================================
@@ -58,12 +80,14 @@ webclient.port: 8080
 
 state.backend: jobmanager
 
+
 # Directory for storing checkpoints in a flink supported filesystem
 # Note: State backend must be accessible from the JobManager, use file://
 # only for local setups. 
 #
 # state.backend.fs.checkpointdir: hdfs://checkpoints
 
+
 #==============================================================================
 # Advanced
 #==============================================================================
@@ -72,6 +96,7 @@ state.backend: jobmanager
 #
 # taskmanager.network.numberOfBuffers: 2048
 
+
 # Directories for temporary files.
 #
 # Add a delimited list for multiple directories, using the system directory
@@ -88,6 +113,7 @@ state.backend: jobmanager
 #
 # taskmanager.tmp.dirs: /tmp
 
+
 # Path to the Hadoop configuration directory.
 #
 # This configuration is used when writing into HDFS. Unless specified otherwise,
@@ -98,3 +124,14 @@ state.backend: jobmanager
 # via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
 #
 # fs.hdfs.hadoopconf: /path/to/hadoop/conf/
+
+
+#==============================================================================
+# High Availability
+#==============================================================================
+
+# The list of ZooKepper quorum peers that coordinate the high-availability
+# setup. This must be a list of the form
+# "host_1[:peerPort[:leaderPort]],host_2[:peerPort[:leaderPort]],..."
+
+#ha.zookeeper.quorum: localhost
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index 5fde51e..4c0d49a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -24,6 +24,12 @@ import org.slf4j.LoggerFactory;
 
 import java.security.PrivilegedExceptionAction;
 
+/**
+ * A utility class that lets program code run in a security context provided by the
+ * Hadoop security user groups.
+ * 
+ * The secure context will for example pick up authentication information from Kerberos.
+ */
 public class SecurityUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SecurityUtils.class);
@@ -44,17 +50,15 @@ public class SecurityUtils {
 			LOG.error("Security is enabled but no Kerberos credentials have been found. " +
 						"You may authenticate using the kinit command.");
 		}
-		T ret = ugi.doAs(new PrivilegedExceptionAction<T>() {
+		return ugi.doAs(new PrivilegedExceptionAction<T>() {
 			@Override
 			public T run() throws Exception {
 				return runner.run();
 			}
 		});
-		return ret;
 	}
 
 	public static interface FlinkSecuredRunner<T> {
 		public T run() throws Exception;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
index d0e706e..c9d3ec4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
@@ -21,20 +21,22 @@ package org.apache.flink.runtime.zookeeper;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.PrintWriter;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
@@ -49,6 +51,10 @@ public class FlinkZooKeeperQuorumPeer {
 
 	public static void main(String[] args) {
 		try {
+			// startup checks and logging
+			EnvironmentInformation.logEnvironmentInfo(LOG, "ZooKeeper Quorum Peer", args);
+			EnvironmentInformation.checkJavaVersion();
+			
 			final ParameterTool params = ParameterTool.fromArgs(args);
 			final String zkConfigFile = params.getRequired("zkConfigFile");
 			final int peerId = params.getInt("peerId");
@@ -57,7 +63,7 @@ public class FlinkZooKeeperQuorumPeer {
 			runFlinkZkQuorumPeer(zkConfigFile, peerId);
 		}
 		catch (Throwable t) {
-			t.printStackTrace();
+			LOG.error("Error running ZooKeeper quorum peer: " + t.getMessage(), t);
 			System.exit(-1);
 		}
 	}
@@ -210,18 +216,16 @@ public class FlinkZooKeeperQuorumPeer {
 
 		dataDir.deleteOnExit();
 
-		// Write myid to file
-		PrintWriter writer = null;
+		LOG.info("Writing {} to myid file in 'dataDir'.", id);
+		
+		// Write myid to file. We use a File Writer, because that properly propagates errors,
+		// while the PrintWriter swallows errors
+		FileWriter writer = new FileWriter(new File(dataDir, "myid"));
 		try {
-			LOG.info("Writing {} to myid file in 'dataDir'.", id);
-
-			writer = new PrintWriter(new File(dataDir, "myid"));
-			writer.println(id);
+			writer.write(String.valueOf(id));
 		}
 		finally {
-			if (writer != null) {
-				writer.close();
-			}
+			writer.close();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0d96edb..dc1599a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1000,35 +1000,41 @@ object JobManager {
       configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
     }
 
-    // HA mode
-    val (hostname, port) = if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
-      // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is
-      // chosen.  For the FlinkMiniCluster you have to choose it on your own.
-      LOG.info("HA mode.")
-
-      if (config.getHost == null) {
-        throw new Exception("Missing parameter '--host'.")
+    // high availability mode
+    val (hostname: String, port: Int ) = 
+      if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
+        // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is
+        // chosen.  For the FlinkMiniCluster you have to choose it on your own.
+        LOG.info("Starting JobManager in High-Availability Mode")
+  
+        if (config.getHost() == null) {
+          throw new Exception("Missing parameter '--host'. Parameter is required when " +
+            "running in high-availability mode")
+        }
+  
+        // Let web server listen on random port
+        configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
+  
+        (config.getHost(), 0)
       }
-
-      // Let web server listen on random port
-      configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-
-      (config.getHost, NetUtils.getAvailablePort)
-    }
-    else {
-      if (config.getHost != null) {
-        throw new IllegalStateException("Specified explicit address for JobManager communication " +
-          "via CLI, but no ZooKeeper quorum has been configured. The task managers will not be " +
-          "able to find the correct JobManager to connect to. Please configure ZooKeeper or " +
-          "don't set the address explicitly (this will fallback to the address configured in " +
-          "in 'conf/flink-conf.yaml'.")
+      else {
+        LOG.info("Staring JobManager without high-availability")
+        
+        if (config.getHost() != null) {
+          throw new Exception("Found an explicit address for JobManager communication " +
+            "via the CLI option '--host'.\n" +
+            "This parameter must only be set if the JobManager is started in high-availability " +
+            "mode and connects to a ZooKeeper quorum.\n" +
+            "Please configure ZooKeeper or don't set the '--host' option, so that the JobManager " +
+            "uses the address configured under 'conf/flink-conf.yaml'.")
+        }
+  
+        val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+        val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+            ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+        (host, port)
       }
 
-      (configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null),
-        configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT))
-    }
-
     (configuration, config.getJobManagerMode(), config.getStreamingMode(), hostname, port)
   }
 


[3/9] flink git commit: [FLINK-2288] [docs] Add docs for HA/ZooKeeper setup

Posted by se...@apache.org.
[FLINK-2288] [docs] Add docs for HA/ZooKeeper setup

This closes #886


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c0dd974
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c0dd974
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c0dd974

Branch: refs/heads/master
Commit: 9c0dd9742966011322be36343611146ed7b862f0
Parents: 8c72b50
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Jul 3 16:45:15 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 docs/_includes/navbar.html                 |   1 +
 docs/page/css/flink.css                    |  14 ++-
 docs/setup/fig/jobmanager_ha_overview.png  | Bin 0 -> 57875 bytes
 docs/setup/jobmanager_high_availability.md | 121 ++++++++++++++++++++++++
 4 files changed, 133 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9c0dd974/docs/_includes/navbar.html
----------------------------------------------------------------------
diff --git a/docs/_includes/navbar.html b/docs/_includes/navbar.html
index 740ec9f..dc7ef30 100644
--- a/docs/_includes/navbar.html
+++ b/docs/_includes/navbar.html
@@ -53,6 +53,7 @@ under the License.
                 <li><a href="{{ setup }}/yarn_setup.html">YARN</a></li>
                 <li><a href="{{ setup }}/gce_setup.html">GCloud</a></li>
                 <li><a href="{{ setup }}/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
+                <li><a href="{{ setup }}/jobmanager_high_availability.html">JobManager High Availability<a></li>
 
                 <li class="divider"></li>
                 <li><a href="{{ setup }}/config.html">Configuration</a></li>

http://git-wip-us.apache.org/repos/asf/flink/blob/9c0dd974/docs/page/css/flink.css
----------------------------------------------------------------------
diff --git a/docs/page/css/flink.css b/docs/page/css/flink.css
index 9074e23..3b09e54 100644
--- a/docs/page/css/flink.css
+++ b/docs/page/css/flink.css
@@ -113,11 +113,19 @@ h2, h3 {
 
 code {
 	background: #f5f5f5;
-  padding: 0;
-  color: #333333;
-  font-family: "Menlo", "Lucida Console", monospace;
+	padding: 0;
+	color: #333333;
+	font-family: "Menlo", "Lucida Console", monospace;
 }
 
 pre {
 	font-size: 85%;
 }
+
+img.center {
+	display: block;
+	margin-left: auto;
+    margin-right: auto;
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/9c0dd974/docs/setup/fig/jobmanager_ha_overview.png
----------------------------------------------------------------------
diff --git a/docs/setup/fig/jobmanager_ha_overview.png b/docs/setup/fig/jobmanager_ha_overview.png
new file mode 100644
index 0000000..ff82cae
Binary files /dev/null and b/docs/setup/fig/jobmanager_ha_overview.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/9c0dd974/docs/setup/jobmanager_high_availability.md
----------------------------------------------------------------------
diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md
new file mode 100644
index 0000000..dec0cdc
--- /dev/null
+++ b/docs/setup/jobmanager_high_availability.md
@@ -0,0 +1,121 @@
+---
+title: "JobManager High Availability (HA)"
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+The JobManager is the coordinator of each Flink deployment. It is responsible for both *scheduling* and *resource management*.
+
+By default, there is a single JobManager instance per Flink cluster. This creates a *single point of failure* (SPOF): if the JobManager crashes, no new programs can be submitted and running programs fail.
+
+With JobManager High Availability, you can run multiple JobManager instances per Flink cluster and thereby circumvent the *SPOF*.
+
+The general idea of JobManager high availability is that there is a **single leading JobManager** at any time and **multiple standby JobManagers** to take over leadership in case the leader fails. This guarantees that there is **no single point of failure** and programs can make progress as soon as a standby JobManager has taken leadership. There is no explicit distinction between standby and master JobManager instances. Each JobManager can take the role of master or standby.
+
+As an example, consider the following setup with three JobManager instances:
+
+<img src="fig/jobmanager_ha_overview.png" class="center" />
+
+## Configuration
+
+To enable JobManager High Availability you have to configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts.
+
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for  *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distirbuted coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper.
+
+Configuring a ZooKeeper quorum in `conf/flink-conf.yaml` *enables* high availability mode and all Flink components try to connect to a JobManager via coordination through ZooKeeper.
+
+- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service.
+  
+  <pre>ha.zookeeper.quorum: address1:2181[,...],addressX:2181</pre>
+
+  Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port.
+
+- The following configuration keys are optional:
+
+  - `ha.zookeeper.dir: /flink [default]`: ZooKeeper directory to use for coordination
+  - TODO Add client configuration keys
+
+## Starting an HA-cluster
+
+In order to start an HA-cluster configure the *masters* file in `conf/masters`:
+
+- **masters file**: The *masters file* contains all hosts, on which JobManagers are started.
+
+  <pre>
+jobManagerAddress1
+[...]
+jobManagerAddressX
+  </pre>
+
+After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start a HA-cluster. **Keep in mind that the ZooKeeper quorum has to be running when you call the scripts**.
+
+## Running ZooKeeper
+
+If you don't have a running ZooKeeper installation, you can use the helper scripts, which ship with Flink.
+
+There is a ZooKeeper configuration template in `conf/zoo.cfg`. You can configure the hosts to run ZooKeeper on with the `server.X` entries, where X is a unique ID of each server:
+
+<pre>
+server.X=addressX:peerPort:leaderPort
+[...]
+server.Y=addressY:peerPort:leaderPort
+</pre>
+
+The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some rqeuired configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation.
+
+## Example: Start and stop a local HA-cluster with 2 JobManagers
+
+1. **Configure ZooKeeper quorum** in `conf/flink.yaml`:
+   
+   <pre>ha.zookeeper.quorum: localhost</pre>
+
+2. **Configure masters** in `conf/masters`:
+
+   <pre>
+localhost
+localhost</pre>
+
+3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine):
+
+   <pre>server.0=localhost:2888:3888</pre>
+
+4. **Start ZooKeeper quorum**:
+
+   <pre>
+$ bin/start-zookeeper-quorum.sh
+Starting zookeeper daemon on host localhost.</pre>
+
+5. **Start an HA-cluster**:
+   
+   <pre>
+$ bin/start-cluster-streaming.sh
+Starting HA cluster (streaming mode) with 2 masters and 1 peers in ZooKeeper quorum.
+Starting jobmanager daemon on host localhost.
+Starting jobmanager daemon on host localhost.
+Starting taskmanager daemon on host localhost.</pre>
+
+6. **Stop ZooKeeper quorum and cluster**:
+
+   <pre>
+$ bin/stop-cluster.sh
+Stopping taskmanager daemon (pid: 7647) on localhost.
+Stopping jobmanager daemon (pid: 7495) on host localhost.
+Stopping jobmanager daemon (pid: 7349) on host localhost.
+$ bin/stop-zookeeper-quorum.sh
+Stopping zookeeper daemon (pid: 7101) on host localhost.</pre>


[4/9] flink git commit: [FLINK-2208] [runtime] Fix reflective access to CPU load to support IBM Java

Posted by se...@apache.org.
[FLINK-2208] [runtime] Fix reflective access to CPU load to support IBM Java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60cfa0b2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60cfa0b2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60cfa0b2

Branch: refs/heads/master
Commit: 60cfa0b2dcb4dae1637d867c4c63c67b3d26adf9
Parents: 8acc0d2
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 8 11:58:00 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala | 47 +++++++++-----------
 1 file changed, 21 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60cfa0b2/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 612d5c0..1c60e89 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -22,7 +22,7 @@ import java.io.{File, IOException}
 import java.net.{InetAddress, InetSocketAddress}
 import java.util.concurrent.TimeUnit
 import java.lang.reflect.Method
-import java.lang.management.ManagementFactory
+import java.lang.management.{OperatingSystemMXBean, ManagementFactory}
 
 import akka.actor._
 import akka.pattern.ask
@@ -35,7 +35,7 @@ import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
 import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 
-import org.apache.flink.configuration._
+import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException}
 import org.apache.flink.runtime.messages.checkpoint.{ConfirmCheckpoint, TriggerCheckpoint, AbstractCheckpointMessage}
 import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
@@ -1752,40 +1752,35 @@ object TaskManager {
         ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()
     })
 
-    // Preprocessing steps for registering cpuLoad
-    val fetchCPULoad = getMethodToFetchCPULoad()
-
-    // Log getProcessCpuLoad unavailable for Java 6
-    if(fetchCPULoad.isEmpty){
-      LOG.warn("getProcessCpuLoad method not available in the Operating System Bean" +
-        "implementation for this Java runtime environment\n" +
-        Thread.currentThread().getStackTrace)
-    }
+    // Pre-processing steps for registering cpuLoad
+    val osBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean()
+        
+    val fetchCPULoadMethod: Option[Method] = 
+      try {
+        Class.forName("com.sun.management.OperatingSystemMXBean")
+          .getMethods()
+          .find( _.getName() == "getProcessCpuLoad" )
+      }
+      catch {
+        case t: Throwable =>
+          LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
+            " - CPU load metrics will not be available.")
+          None
+      }
 
     metricRegistry.register("cpuLoad", new Gauge[Double] {
       override def getValue: Double = {
         try{
-          fetchCPULoad.map(_.invoke(ManagementFactory.getOperatingSystemMXBean().
-            asInstanceOf[com.sun.management.OperatingSystemMXBean]).
-            asInstanceOf[Double]).getOrElse(-1)
-        } catch {
+          fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
+        }
+        catch {
           case t: Throwable => {
             LOG.warn("Error retrieving CPU Load through OperatingSystemMXBean", t)
-            -1
+            -1.0
           }
         }
       }
     })
     metricRegistry
   }
-
-  /**
-   * Fetches getProcessCpuLoad method if available in the
-   *  OperatingSystemMXBean implementation else returns None
-   * @return
-   */
-  private def getMethodToFetchCPULoad(): Option[Method] = {
-    val methodsList = classOf[com.sun.management.OperatingSystemMXBean].getMethods()
-    methodsList.filter(_.getName == "getProcessCpuLoad").headOption
-  }
 }


[8/9] flink git commit: [FLINK-2327] [runtime] Log the limit of open file handles at TaskManager startup

Posted by se...@apache.org.
[FLINK-2327] [runtime] Log the limit of open file handles at TaskManager startup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/706c391b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/706c391b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/706c391b

Branch: refs/heads/master
Commit: 706c391b11ac6fc8c1487cbe43b20b5510c7a459
Parents: 535475c
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 8 15:47:45 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 .../runtime/util/EnvironmentInformation.java    | 29 ++++++++++++++++++++
 .../flink/runtime/taskmanager/TaskManager.scala |  7 +++++
 .../util/EnvironmentInformationTest.java        |  1 +
 3 files changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/706c391b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index 4efdf11..630e227 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.util;
 import java.io.InputStream;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
+import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Properties;
 
@@ -202,6 +203,34 @@ public class EnvironmentInformation {
 	}
 
 	/**
+	 * Tries to retrieve the maximum number of open file handles. This method will only work on
+	 * UNIX-based operating systems with Sun/Oracle Java versions.
+	 * 
+	 * <p>If the number of max open file handles cannot be determined, this method returns {@code -1}.</p>
+	 * 
+	 * @return The limit of open file handles, or {@code -1}, if the limit could not be determined.
+	 */
+	public static long getOpenFileHandlesLimit() {
+		Class<?> sunBeanClass;
+		try {
+			sunBeanClass = Class.forName("com.sun.management.UnixOperatingSystemMXBean");
+		}
+		catch (ClassNotFoundException e) {
+			return -1L;
+		}
+		
+		try {
+			Method fhLimitMethod = sunBeanClass.getMethod("getMaxFileDescriptorCount");
+			Object result = fhLimitMethod.invoke(ManagementFactory.getOperatingSystemMXBean());
+			return (Long) result;
+		}
+		catch (Throwable t) {
+			LOG.warn("Unexpected error when accessing file handle limit", t);
+			return -1L;
+		}
+	}
+	
+	/**
 	 * Logs a information about the environment, like code revision, current user, java version,
 	 * and JVM parameters.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/706c391b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 4f95c1c..9bdf1a8 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1032,6 +1032,13 @@ object TaskManager {
     EnvironmentInformation.logEnvironmentInfo(LOG.logger, "TaskManager", args)
     EnvironmentInformation.checkJavaVersion()
 
+    val maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit()
+    if (maxOpenFileHandles == -1) {
+      LOG.info(s"Maximum number of open file descriptors is $maxOpenFileHandles")
+    } else {
+      LOG.info("Cannot determine the maximum number of open file descriptors")
+    }
+    
     // try to parse the command line arguments
     val (configuration: Configuration,
          mode: StreamingMode) = try {

http://git-wip-us.apache.org/repos/asf/flink/blob/706c391b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
index 764cb57..771c57c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/EnvironmentInformationTest.java
@@ -56,6 +56,7 @@ public class EnvironmentInformationTest {
 			assertNotNull(EnvironmentInformation.getRevisionInformation());
 			assertNotNull(EnvironmentInformation.getVersion());
 			assertNotNull(EnvironmentInformation.getUserRunning());
+			assertTrue(EnvironmentInformation.getOpenFileHandlesLimit() >= -1);
 		}
 		catch (Exception e) {
 			e.printStackTrace();