You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/12 14:48:43 UTC

[64/73] [abbrv] prefix all projects in addons and quickstarts with flink-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
new file mode 100644
index 0000000..3a58afc
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.aggregators.AggregatorRegistry;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.record.functions.CoGroupFunction;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
+import org.apache.flink.api.java.record.operators.CoGroupOperator;
+import org.apache.flink.api.java.record.operators.DeltaIteration;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.ReflectionUtil;
+
+public class SpargelIteration {
+	
+	private static final String DEFAULT_NAME = "<unnamed vertex-centric iteration>";
+	
+	private final DeltaIteration iteration;
+	
+	private final Class<? extends Key<?>> vertexKey;
+	private final Class<? extends Value> vertexValue;
+	private final Class<? extends Value> messageType;
+	private final Class<? extends Value> edgeValue;
+	
+	private final CoGroupOperator vertexUpdater;
+	private final CoGroupOperator messager;
+	
+	
+	// ----------------------------------------------------------------------------------
+	
+	public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value>
+			SpargelIteration(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+			VertexUpdateFunction<VertexKey, VertexValue, Message> uf)
+	{
+		this(mf, uf, DEFAULT_NAME);
+	}
+	
+	public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> SpargelIteration(
+			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
+			String name)
+	{
+		// get the types
+		this.vertexKey = ReflectionUtil.getTemplateType1(mf.getClass());
+		this.vertexValue = ReflectionUtil.getTemplateType2(mf.getClass());
+		this.messageType = ReflectionUtil.getTemplateType3(mf.getClass());
+		this.edgeValue = ReflectionUtil.getTemplateType4(mf.getClass());
+		
+		if (vertexKey == null || vertexValue == null || messageType == null || edgeValue == null) {
+			throw new RuntimeException();
+		}
+	
+		// instantiate the data flow
+		this.iteration = new DeltaIteration(0, name);
+		
+		this.messager = CoGroupOperator.builder(MessagingDriver.class, vertexKey, 0, 0)
+			.input2(iteration.getWorkset())
+			.name("Message Sender")
+			.build();
+		this.vertexUpdater = CoGroupOperator.builder(VertexUpdateDriver.class, vertexKey, 0, 0)
+			.input1(messager)
+			.input2(iteration.getSolutionSet())
+			.name("Vertex Updater")
+			.build();
+		
+		iteration.setNextWorkset(vertexUpdater);
+		iteration.setSolutionSetDelta(vertexUpdater);
+		
+		// parameterize the data flow
+		try {
+			Configuration vertexUdfParams = vertexUpdater.getParameters();
+			InstantiationUtil.writeObjectToConfig(uf, vertexUdfParams, VertexUpdateDriver.UDF_PARAM);
+			vertexUdfParams.setClass(VertexUpdateDriver.KEY_PARAM, vertexKey);
+			vertexUdfParams.setClass(VertexUpdateDriver.VALUE_PARAM, vertexValue);
+			vertexUdfParams.setClass(VertexUpdateDriver.MESSAGE_PARAM, messageType);
+			
+			Configuration messageUdfParams = messager.getParameters();
+			InstantiationUtil.writeObjectToConfig(mf, messageUdfParams, MessagingDriver.UDF_PARAM);
+			messageUdfParams.setClass(MessagingDriver.KEY_PARAM, vertexKey);
+			messageUdfParams.setClass(MessagingDriver.VALUE_PARAM, vertexValue);
+			messageUdfParams.setClass(MessagingDriver.MESSAGE_PARAM, messageType);
+			messageUdfParams.setClass(MessagingDriver.EDGE_PARAM, edgeValue);
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Could not serialize the UDFs for distribution" + 
+					(e.getMessage() == null ? '.' : ": " + e.getMessage()), e);
+		}
+	}
+	
+	// ----------------------------------------------------------------------------------
+	//  inputs and outputs
+	// ----------------------------------------------------------------------------------
+	
+	public void setVertexInput(Operator<Record> c) {
+		this.iteration.setInitialSolutionSet(c);
+		this.iteration.setInitialWorkset(c);
+	}
+	
+	public void setEdgesInput(Operator<Record> c) {
+		this.messager.setFirstInput(c);
+	}
+	
+	public Operator<?> getOutput() {
+		return this.iteration;
+	}
+	
+	public void setDegreeOfParallelism(int dop) {
+		this.iteration.setDegreeOfParallelism(dop);
+	}
+	
+	public void setNumberOfIterations(int iterations) {
+		this.iteration.setMaximumNumberOfIterations(iterations);
+	}
+	
+	public AggregatorRegistry getAggregators() {
+		return this.iteration.getAggregators();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Wrapping UDFs
+	// --------------------------------------------------------------------------------------------
+	
+	@ConstantFieldsFirst(0)
+	public static final class VertexUpdateDriver<K extends Key<K>, V extends Value, M extends Value> extends CoGroupFunction {
+		
+		private static final long serialVersionUID = 1L;
+		
+		private static final String UDF_PARAM = "spargel.udf";
+		private static final String KEY_PARAM = "spargel.key-type";
+		private static final String VALUE_PARAM = "spargel.value-type";
+		private static final String MESSAGE_PARAM = "spargel.message-type";
+		
+		private VertexUpdateFunction<K, V, M> vertexUpdateFunction;
+		
+		private K vertexKey;
+		private V vertexValue;
+		private MessageIterator<M> messageIter;
+
+		@Override
+		public void coGroup(Iterator<Record> messages, Iterator<Record> vertex, Collector<Record> out) throws Exception {
+
+			if (vertex.hasNext()) {
+				Record first = vertex.next();
+				first.getFieldInto(0, vertexKey);
+				first.getFieldInto(1, vertexValue);
+				messageIter.setSource(messages);
+				vertexUpdateFunction.setOutput(first, out);
+				vertexUpdateFunction.updateVertex(vertexKey, vertexValue, messageIter);
+			} else {
+				if (messages.hasNext()) {
+					String message = "Target vertex does not exist!.";
+					try {
+						Record next = messages.next();
+						next.getFieldInto(0, vertexKey);
+						message = "Target vertex '" + vertexKey + "' does not exist!.";
+					} catch (Throwable t) {}
+					throw new Exception(message);
+				} else {
+					throw new Exception();
+				}
+			}
+		}
+		
+		@SuppressWarnings("unchecked")
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// instantiate only the first time
+			if (vertexUpdateFunction == null) {
+				Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, Key.class);
+				Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, Value.class);
+				Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class);
+				
+				vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
+				vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
+				messageIter = new MessageIterator<M>(InstantiationUtil.instantiate(messageClass, Value.class));
+				
+				try {
+					this.vertexUpdateFunction = (VertexUpdateFunction<K, V, M>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, parameters.getClassLoader());
+				} catch (Exception e) {
+					String message = e.getMessage() == null ? "." : ": " + e.getMessage();
+					throw new Exception("Could not instantiate VertexUpdateFunction" + message, e);
+				}
+				
+				this.vertexUpdateFunction.init(getIterationRuntimeContext());
+				this.vertexUpdateFunction.setup(parameters);
+			}
+			this.vertexUpdateFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.vertexUpdateFunction.postSuperstep();
+		}
+	}
+	
+	public static final class MessagingDriver<K extends Key<K>, V extends Value, M extends Value, E extends Value> extends CoGroupFunction {
+
+		private static final long serialVersionUID = 1L;
+		
+		private static final String UDF_PARAM = "spargel.udf";
+		private static final String KEY_PARAM = "spargel.key-type";
+		private static final String VALUE_PARAM = "spargel.value-type";
+		private static final String MESSAGE_PARAM = "spargel.message-type";
+		private static final String EDGE_PARAM = "spargel.edge-value";
+		
+		
+		private MessagingFunction<K, V, M, E> messagingFunction;
+		
+		private K vertexKey;
+		private V vertexValue;
+		
+		@Override
+		public void coGroup(Iterator<Record> edges, Iterator<Record> state, Collector<Record> out) throws Exception {
+			if (state.hasNext()) {
+				Record first = state.next();
+				first.getFieldInto(0, vertexKey);
+				first.getFieldInto(1, vertexValue);
+				messagingFunction.set(edges, out);
+				messagingFunction.sendMessages(vertexKey, vertexValue);
+			}
+		}
+		
+		@SuppressWarnings("unchecked")
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// instantiate only the first time
+			if (messagingFunction == null) {
+				Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, Key.class);
+				Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, Value.class);
+//				Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class);
+				Class<E> edgeClass = parameters.getClass(EDGE_PARAM, null, Value.class);
+				
+				vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
+				vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
+				
+				K edgeKeyHolder = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
+				E edgeValueHolder = InstantiationUtil.instantiate(edgeClass, Value.class);
+				
+				try {
+					this.messagingFunction = (MessagingFunction<K, V, M, E>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, parameters.getClassLoader());
+				} catch (Exception e) {
+					String message = e.getMessage() == null ? "." : ": " + e.getMessage();
+					throw new Exception("Could not instantiate MessagingFunction" + message, e);
+				}
+				
+				this.messagingFunction.init(getIterationRuntimeContext(), edgeKeyHolder, edgeValueHolder);
+				this.messagingFunction.setup(parameters);
+			}
+			this.messagingFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.messagingFunction.postSuperstep();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
new file mode 100644
index 0000000..37e32cd
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+/**
+ * 
+ * <VertexKey> The vertex key type.
+ * <VertexValue> The vertex value type.
+ * <Message> The message type.
+ */
+public abstract class VertexUpdateFunction<VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value> implements Serializable {
+
+	// --------------------------------------------------------------------------------------------
+	//  Public API Methods
+	// --------------------------------------------------------------------------------------------
+	
+	public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
+	
+	public void setup(Configuration config) throws Exception {}
+	
+	public void preSuperstep() throws Exception {}
+	
+	public void postSuperstep() throws Exception {}
+	
+	public void setNewVertexValue(VertexValue newValue) {
+		outVal.setField(1, newValue);
+		out.collect(outVal);
+	}
+	
+	public int getSuperstep() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+	
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+	
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  internal methods
+	// --------------------------------------------------------------------------------------------
+	
+	private IterationRuntimeContext runtimeContext;
+	
+	private Collector<Record> out;
+	
+	private Record outVal;
+	
+	
+	void init(IterationRuntimeContext context) {
+		this.runtimeContext = context;
+	}
+	
+	void setOutput(Record val, Collector<Record> out) {
+		this.out = out;
+		this.outVal = val;
+	}
+	
+	// serializability
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
new file mode 100644
index 0000000..678b5e1
--- /dev/null
+++ b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.spargel.java;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.PlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner;
+import org.apache.flink.test.compiler.util.CompilerTestBase;
+
+
+public class SpargelCompilerTest extends CompilerTestBase {
+
+	@Test
+	public void testSpargelCompiler() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+			// compose test program
+			{
+				DataSet<Long> vertexIds = env.generateSequence(1, 2);
+				
+				@SuppressWarnings("unchecked")
+				DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+				
+				DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+				DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
+				
+				result.print();
+			}
+			
+			Plan p = env.createProgramPlan("Spargel Connected Components");
+			OptimizedPlan op = compileNoStats(p);
+			
+			// check the sink
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism());
+			
+			// check the iteration
+			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+			assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism());
+			
+			// check the solution set join and the delta
+			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+			
+			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+			assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+			
+			// check the workset set join
+			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+			
+			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+			
+			// check that the initial partitioning is pushed out of the loop
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+			
+			// check that the initial workset sort is outside the loop
+			assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
+			assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSpargelCompilerWithBroadcastVariable() {
+		try {
+			final String BC_VAR_NAME = "borat variable";
+			
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+			// compose test program
+			{
+				DataSet<Long> bcVar = env.fromElements(1L);
+				
+				DataSet<Long> vertexIds = env.generateSequence(1, 2);
+				
+				@SuppressWarnings("unchecked")
+				DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+				
+				DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+				
+				VertexCentricIteration<Long, Long, Long, ?> vcIter = VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100);
+				vcIter.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
+				vcIter.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
+				
+				DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(vcIter);
+				
+				result.print();
+			}
+			
+			Plan p = env.createProgramPlan("Spargel Connected Components");
+			OptimizedPlan op = compileNoStats(p);
+			
+			// check the sink
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism());
+			
+			// check the iteration
+			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+			assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism());
+			
+			// check the solution set join and the delta
+			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+			
+			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+			assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+			
+			// check the workset set join
+			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+			
+			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+			
+			// check that the initial partitioning is pushed out of the loop
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
new file mode 100644
index 0000000..e862e7c
--- /dev/null
+++ b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.spargel.java;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.DeltaIteration;
+import org.apache.flink.api.java.DeltaIterationResultSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.spargel.java.MessageIterator;
+import org.apache.flink.spargel.java.MessagingFunction;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.VertexUpdateFunction;
+
+@SuppressWarnings("serial")
+public class SpargelTranslationTest {
+
+	@Test
+	public void testTranslationPlainEdges() {
+		try {
+			final String ITERATION_NAME = "Test Name";
+			
+			final String AGGREGATOR_NAME = "AggregatorName";
+			
+			final String BC_SET_MESSAGES_NAME = "borat messages";
+			
+			final String BC_SET_UPDATES_NAME = "borat updates";
+			;
+			final int NUM_ITERATIONS = 13;
+			
+			final int ITERATION_DOP = 77;
+			
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> bcMessaging = env.fromElements(1L);
+			DataSet<Long> bcUpdate = env.fromElements(1L);
+			
+			DataSet<Tuple2<String, Double>> result;
+			
+			// ------------ construct the test program ------------------
+			{
+				
+				@SuppressWarnings("unchecked")
+				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+	
+				@SuppressWarnings("unchecked")
+				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+				
+				
+				VertexCentricIteration<String, Double, Long, ?> vertexIteration = 
+						VertexCentricIteration.withPlainEdges(edges, new UpdateFunction(), new MessageFunctionNoEdgeValue(), NUM_ITERATIONS);
+				vertexIteration.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcMessaging);
+				vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
+				
+				vertexIteration.setName(ITERATION_NAME);
+				vertexIteration.setParallelism(ITERATION_DOP);
+				
+				vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+				
+				result = initialVertices.runOperation(vertexIteration);
+			}
+			
+			
+			// ------------- validate the java program ----------------
+			
+			assertTrue(result instanceof DeltaIterationResultSet);
+			
+			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+			
+			// check the basic iteration properties
+			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+			assertEquals(ITERATION_DOP, iteration.getParallelism());
+			assertEquals(ITERATION_NAME, iteration.getName());
+			
+			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+			
+			// validate that the semantic properties are set as they should
+			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+			assertTrue(solutionSetJoin.getSematicProperties().getForwardedField1(0).contains(0));
+			assertTrue(solutionSetJoin.getSematicProperties().getForwardedField2(0).contains(0));
+			
+			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+			
+			// validate that the broadcast sets are forwarded
+			assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+			assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testTranslationPlainEdgesWithForkedBroadcastVariable() {
+		try {
+			final String ITERATION_NAME = "Test Name";
+			
+			final String AGGREGATOR_NAME = "AggregatorName";
+			
+			final String BC_SET_MESSAGES_NAME = "borat messages";
+			
+			final String BC_SET_UPDATES_NAME = "borat updates";
+			;
+			final int NUM_ITERATIONS = 13;
+			
+			final int ITERATION_DOP = 77;
+			
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> bcVar = env.fromElements(1L);
+			
+			DataSet<Tuple2<String, Double>> result;
+			
+			// ------------ construct the test program ------------------
+			{
+				
+				@SuppressWarnings("unchecked")
+				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+	
+				@SuppressWarnings("unchecked")
+				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+				
+				
+				VertexCentricIteration<String, Double, Long, ?> vertexIteration = 
+						VertexCentricIteration.withPlainEdges(edges, new UpdateFunction(), new MessageFunctionNoEdgeValue(), NUM_ITERATIONS);
+				vertexIteration.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcVar);
+				vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
+				
+				vertexIteration.setName(ITERATION_NAME);
+				vertexIteration.setParallelism(ITERATION_DOP);
+				
+				vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+				
+				result = initialVertices.runOperation(vertexIteration);
+			}
+			
+			
+			// ------------- validate the java program ----------------
+			
+			assertTrue(result instanceof DeltaIterationResultSet);
+			
+			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+			
+			// check the basic iteration properties
+			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+			assertEquals(ITERATION_DOP, iteration.getParallelism());
+			assertEquals(ITERATION_NAME, iteration.getName());
+			
+			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+			
+			// validate that the semantic properties are set as they should
+			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+			assertTrue(solutionSetJoin.getSematicProperties().getForwardedField1(0).contains(0));
+			assertTrue(solutionSetJoin.getSematicProperties().getForwardedField2(0).contains(0));
+			
+			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+			
+			// validate that the broadcast sets are forwarded
+			assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+			assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static class UpdateFunction extends VertexUpdateFunction<String, Double, Long> {
+
+		@Override
+		public void updateVertex(String vertexKey, Double vertexValue, MessageIterator<Long> inMessages) {}
+	}
+	
+	public static class MessageFunctionNoEdgeValue extends MessagingFunction<String, Double, Long, Object> {
+
+		@Override
+		public void sendMessages(String vertexKey, Double vertexValue) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
new file mode 100644
index 0000000..a34f2db
--- /dev/null
+++ b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.spargel;
+
+import java.io.BufferedReader;
+
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class SpargelConnectedComponentsITCase extends JavaProgramTestBase {
+
+	private static final long SEED = 9487520347802987L;
+	
+	private static final int NUM_VERTICES = 1000;
+	
+	private static final int NUM_EDGES = 10000;
+
+	private String resultPath;
+	
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempFilePath("results");
+	}
+	
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
+		DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
+		
+		DataSet<Tuple2<Long, Long>> edges = edgeString.map(new EdgeParser());
+		
+		DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+		DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
+		
+		result.writeAsCsv(resultPath, "\n", " ");
+		env.execute("Spargel Connected Components");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		for (BufferedReader reader : getResultReader(resultPath)) {
+			ConnectedComponentsData.checkOddEvenResult(reader);
+		}
+	}
+	
+	public static final class EdgeParser extends MapFunction<String, Tuple2<Long, Long>> {
+		public Tuple2<Long, Long> map(String value) {
+			String[] nums = value.split(" ");
+			return new Tuple2<Long, Long>(Long.parseLong(nums[0]), Long.parseLong(nums[1]));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/pom.xml b/flink-addons/flink-yarn/pom.xml
new file mode 100644
index 0000000..6c2d130
--- /dev/null
+++ b/flink-addons/flink-yarn/pom.xml
@@ -0,0 +1,60 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-addons</artifactId>
+		<version>0.6-incubating-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+	
+	<artifactId>flink-yarn</artifactId>
+	<name>flink-yarn</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<artifactId>hadoop-core</artifactId>
+					<groupId>org.apache.hadoop</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-yarn-client</artifactId>
+			<version>${hadoop.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<version>${hadoop.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<version>${hadoop.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-mapreduce-client-core</artifactId>
+			<version>${hadoop.version}</version>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
new file mode 100644
index 0000000..40635dc
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.flink.yarn;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.base.Preconditions;
+
+public class ApplicationMaster {
+
+	private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
+	
+	private void run() throws Exception  {
+		//Utils.logFilesInCurrentDirectory(LOG);
+		// Initialize clients to ResourceManager and NodeManagers
+		Configuration conf = Utils.initializeYarnConfiguration();
+		FileSystem fs = FileSystem.get(conf);
+		Map<String, String> envs = System.getenv();
+		final String currDir = envs.get(Environment.PWD.key());
+		final String logDirs =  envs.get(Environment.LOG_DIRS.key());
+		final String ownHostname = envs.get(Environment.NM_HOST.key());
+		final String appId = envs.get(Client.ENV_APP_ID);
+		final String clientHomeDir = envs.get(Client.ENV_CLIENT_HOME_DIR);
+		final String applicationMasterHost = envs.get(Environment.NM_HOST.key());
+		final String remoteFlinkJarPath = envs.get(Client.FLINK_JAR_PATH);
+		final String shipListString = envs.get(Client.ENV_CLIENT_SHIP_FILES);
+		final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
+		final int taskManagerCount = Integer.valueOf(envs.get(Client.ENV_TM_COUNT));
+		final int memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
+		final int coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
+		
+		int heapLimit = Utils.calculateHeapSize(memoryPerTaskManager);
+		
+		if(currDir == null) {
+			throw new RuntimeException("Current directory unknown");
+		}
+		if(ownHostname == null) {
+			throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
+		}
+		LOG.info("Working directory "+currDir);
+		
+		// load Flink configuration.
+		Utils.getFlinkConfiguration(currDir);
+		
+		final String localWebInterfaceDir = currDir+"/resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME;
+		
+		// Update yaml conf -> set jobManager address to this machine's address.
+		FileInputStream fis = new FileInputStream(currDir+"/flink-conf.yaml");
+		BufferedReader br = new BufferedReader(new InputStreamReader(fis));
+		Writer output = new BufferedWriter(new FileWriter(currDir+"/flink-conf-modified.yaml"));
+		String line ;
+		while ( (line = br.readLine()) != null) {
+			if(line.contains(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY)) {
+				output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
+			} else if(line.contains(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY)) {
+				output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+"\n");
+			} else {
+				output.append(line+"\n");
+			}
+		}
+		// just to make sure.
+		output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
+		output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+localWebInterfaceDir+"\n");
+		output.append(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY+": "+logDirs+"\n");
+		output.close();
+		br.close();
+		File newConf = new File(currDir+"/flink-conf-modified.yaml");
+		if(!newConf.exists()) {
+			LOG.warn("modified yaml does not exist!");
+		}
+		
+		Utils.copyJarContents("resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME, 
+				ApplicationMaster.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+		
+		JobManager jm;
+		{
+			String pathToNepheleConfig = currDir+"/flink-conf-modified.yaml";
+			String[] args = {"-executionMode","cluster", "-configDir", pathToNepheleConfig};
+			
+			// start the job manager
+			jm = JobManager.initialize( args );
+			
+			// Start info server for jobmanager
+			jm.startInfoServer();
+		}
+		
+		AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
+		rmClient.init(conf);
+		rmClient.start();
+
+		NMClient nmClient = NMClient.createNMClient();
+		nmClient.init(conf);
+		nmClient.start();
+
+		// Register with ResourceManager
+		LOG.info("registering ApplicationMaster");
+		rmClient.registerApplicationMaster(applicationMasterHost, 0, "http://"+applicationMasterHost+":"+GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "undefined"));
+
+		// Priority for worker containers - priorities are intra-application
+		Priority priority = Records.newRecord(Priority.class);
+		priority.setPriority(0);
+
+		// Resource requirements for worker containers
+		Resource capability = Records.newRecord(Resource.class);
+		capability.setMemory(memoryPerTaskManager);
+		capability.setVirtualCores(coresPerTaskManager);
+
+		// Make container requests to ResourceManager
+		for (int i = 0; i < taskManagerCount; ++i) {
+			ContainerRequest containerAsk = new ContainerRequest(capability,
+					null, null, priority);
+			LOG.info("Requesting TaskManager container " + i);
+			rmClient.addContainerRequest(containerAsk);
+		}
+		
+		LocalResource flinkJar = Records.newRecord(LocalResource.class);
+		LocalResource flinkConf = Records.newRecord(LocalResource.class);
+
+		// register Flink Jar with remote HDFS
+		final Path remoteJarPath = new Path(remoteFlinkJarPath);
+		Utils.registerLocalResource(fs, remoteJarPath, flinkJar);
+		
+		// register conf with local fs.
+		Path remoteConfPath = Utils.setupLocalResource(conf, fs, appId, new Path("file://"+currDir+"/flink-conf-modified.yaml"), flinkConf, new Path(clientHomeDir));
+		LOG.info("Prepared localresource for modified yaml: "+flinkConf);
+		
+		
+		boolean hasLog4j = new File(currDir+"/log4j.properties").exists();
+		// prepare the files to ship
+		LocalResource[] remoteShipRsc = null;
+		String[] remoteShipPaths = shipListString.split(",");
+		if(!shipListString.isEmpty()) {
+			remoteShipRsc = new LocalResource[remoteShipPaths.length]; 
+			{ // scope for i
+				int i = 0;
+				for(String remoteShipPathStr : remoteShipPaths) {
+					if(remoteShipPathStr == null || remoteShipPathStr.isEmpty()) {
+						continue;
+					}
+					remoteShipRsc[i] = Records.newRecord(LocalResource.class);
+					Path remoteShipPath = new Path(remoteShipPathStr);
+					Utils.registerLocalResource(fs, remoteShipPath, remoteShipRsc[i]);
+					i++;
+				}
+			}
+		}
+		
+		// respect custom JVM options in the YAML file
+		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+				
+		// Obtain allocated containers and launch
+		int allocatedContainers = 0;
+		int completedContainers = 0;
+		while (allocatedContainers < taskManagerCount) {
+			AllocateResponse response = rmClient.allocate(0);
+			for (Container container : response.getAllocatedContainers()) {
+				LOG.info("Got new Container for TM "+container.getId()+" on host "+container.getNodeId().getHost());
+				++allocatedContainers;
+
+				// Launch container by create ContainerLaunchContext
+				ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+				
+				String tmCommand = "$JAVA_HOME/bin/java -Xmx"+heapLimit+"m " + javaOpts ;
+				if(hasLog4j) {
+					tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
+				}
+				tmCommand	+= " org.apache.flink.yarn.YarnTaskManagerRunner -configDir . "
+						+ " 1>"
+						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
+						+ "/taskmanager-stdout.log" 
+						+ " 2>"
+						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
+						+ "/taskmanager-stderr.log";
+				ctx.setCommands(Collections.singletonList(tmCommand));
+				
+				LOG.info("Starting TM with command="+tmCommand);
+				
+				// copy resources to the TaskManagers.
+				Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
+				localResources.put("flink.jar", flinkJar);
+				localResources.put("flink-conf.yaml", flinkConf);
+				
+				// add ship resources
+				if(!shipListString.isEmpty()) {
+					Preconditions.checkNotNull(remoteShipRsc);
+					for( int i = 0; i < remoteShipPaths.length; i++) {
+						localResources.put(new Path(remoteShipPaths[i]).getName(), remoteShipRsc[i]);
+					}
+				}
+				
+				
+				ctx.setLocalResources(localResources);
+				
+				// Setup CLASSPATH for Container (=TaskTracker)
+				Map<String, String> containerEnv = new HashMap<String, String>();
+				Utils.setupEnv(conf, containerEnv); //add flink.jar to class path.
+				containerEnv.put(Client.ENV_CLIENT_USERNAME, yarnClientUsername);
+				
+				ctx.setEnvironment(containerEnv);
+
+				UserGroupInformation user = UserGroupInformation.getCurrentUser();
+				try {
+					Credentials credentials = user.getCredentials();
+					DataOutputBuffer dob = new DataOutputBuffer();
+					credentials.writeTokenStorageToStream(dob);
+					ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(),
+							0, dob.getLength());
+					ctx.setTokens(securityTokens);
+				} catch (IOException e) {
+					LOG.warn("Getting current user info failed when trying to launch the container"
+							+ e.getMessage());
+				}
+				
+				LOG.info("Launching container " + allocatedContainers);
+				nmClient.startContainer(container, ctx);
+			}
+			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+				++completedContainers;
+				LOG.info("Completed container (while allocating) "+status.getContainerId()+". Total Completed:" + completedContainers);
+				LOG.info("Diagnostics "+status.getDiagnostics());
+			}
+			Thread.sleep(100);
+		}
+
+		// Now wait for containers to complete
+		
+		while (completedContainers < taskManagerCount) {
+			AllocateResponse response = rmClient.allocate(completedContainers
+					/ taskManagerCount);
+			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+				++completedContainers;
+				LOG.info("Completed container "+status.getContainerId()+". Total Completed:" + completedContainers);
+				LOG.info("Diagnostics "+status.getDiagnostics());
+			}
+			Thread.sleep(5000);
+		}
+		LOG.info("Shutting down JobManager");
+		jm.shutdown();
+		
+		// Un-register with ResourceManager
+		rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
+		
+		
+	}
+	public static void main(String[] args) throws Exception {
+		final String yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME);
+		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
+				+ " user to execute Flink ApplicationMaster/JobManager to '"+yarnClientUsername+"'");
+		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
+			ugi.addToken(toks);
+		}
+		ugi.doAs(new PrivilegedAction<Object>() {
+			@Override
+			public Object run() {
+				try {
+					new ApplicationMaster().run();
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+				return null;
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
new file mode 100644
index 0000000..6d4c7b5
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
@@ -0,0 +1,633 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.flink.yarn;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.JarFile;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+/**
+ * All classes in this package contain code taken from
+ * https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+ * and
+ * https://github.com/hortonworks/simple-yarn-app
+ * and 
+ * https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+ * 
+ * The Flink jar is uploaded to HDFS by this client. 
+ * The application master and all the TaskManager containers get the jar file downloaded
+ * by YARN into their local fs.
+ * 
+ */
+public class Client {
+	private static final Log LOG = LogFactory.getLog(Client.class);
+	
+	/**
+	 * Command Line argument options
+	 */
+	private static final Option QUERY = new Option("q","query",false, "Display available YARN resources (memory, cores)");
+	// --- or ---
+	private static final Option VERBOSE = new Option("v","verbose",false, "Verbose debug mode");
+	private static final Option GEN_CONF = new Option("g","generateConf",false, "Place default configuration file in current directory");
+	private static final Option QUEUE = new Option("qu","queue",true, "Specify YARN queue.");
+	private static final Option SHIP_PATH = new Option("s","ship",true, "Ship files in the specified directory");
+	private static final Option FLINK_CONF_DIR = new Option("c","confDir",true, "Path to Flink configuration directory");
+	private static final Option FLINK_JAR = new Option("j","jar",true, "Path to Flink jar file");
+	private static final Option JM_MEMORY = new Option("jm","jobManagerMemory",true, "Memory for JobManager Container [in MB]");
+	private static final Option TM_MEMORY = new Option("tm","taskManagerMemory",true, "Memory per TaskManager Container [in MB]");
+	private static final Option TM_CORES = new Option("tmc","taskManagerCores",true, "Virtual CPU cores per TaskManager");
+	private static final Option CONTAINER = new Option("n","container",true, "Number of Yarn container to allocate (=Number of"
+			+ " TaskTrackers)");
+	
+	/**
+	 * Constants
+	 */
+	// environment variable names 
+	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+	public final static String ENV_TM_CORES = "_CLIENT_TM_CORES";
+	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+	public final static String ENV_APP_ID = "_APP_ID";
+	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
+	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
+	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+	
+	private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
+
+	
+	
+	private Configuration conf;
+
+	public void run(String[] args) throws Exception {
+		
+		if(UserGroupInformation.isSecurityEnabled()) {
+			throw new RuntimeException("Flink YARN client does not have security support right now."
+					+ "File a bug, we will fix it asap");
+		}
+		//Utils.logFilesInCurrentDirectory(LOG);
+		//
+		//	Command Line Options
+		//
+		Options options = new Options();
+		options.addOption(VERBOSE);
+		options.addOption(FLINK_CONF_DIR);
+		options.addOption(FLINK_JAR);
+		options.addOption(JM_MEMORY);
+		options.addOption(TM_MEMORY);
+		options.addOption(TM_CORES);
+		options.addOption(CONTAINER);
+		options.addOption(GEN_CONF);
+		options.addOption(QUEUE);
+		options.addOption(QUERY);
+		options.addOption(SHIP_PATH);
+		
+		CommandLineParser parser = new PosixParser();
+		CommandLine cmd = null;
+		try {
+			cmd = parser.parse( options, args);
+		} catch(MissingOptionException moe) {
+			System.out.println(moe.getMessage());
+			printUsage();
+			System.exit(1);
+		}
+		
+		if (System.getProperty("log4j.configuration") == null) {
+			Logger root = Logger.getRootLogger();
+			root.removeAllAppenders();
+			PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
+			ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
+			root.addAppender(appender);
+			if(cmd.hasOption(VERBOSE.getOpt())) {
+				root.setLevel(Level.DEBUG);
+				LOG.debug("CLASSPATH: "+System.getProperty("java.class.path"));
+			} else {
+				root.setLevel(Level.INFO);
+			}
+		}
+		
+		
+		// Jar Path
+		Path localJarPath;
+		if(cmd.hasOption(FLINK_JAR.getOpt())) {
+			String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
+			if(!userPath.startsWith("file://")) {
+				userPath = "file://" + userPath;
+			}
+			localJarPath = new Path(userPath);
+		} else {
+			localJarPath = new Path("file://"+Client.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+		}
+		
+		if(cmd.hasOption(GEN_CONF.getOpt())) {
+			LOG.info("Placing default configuration in current directory");
+			File outFile = generateDefaultConf(localJarPath);
+			LOG.info("File written to "+outFile.getAbsolutePath());
+			System.exit(0);
+		}
+		
+		// Conf Path 
+		Path confPath = null;
+		String confDirPath = "";
+		if(cmd.hasOption(FLINK_CONF_DIR.getOpt())) {
+			confDirPath = cmd.getOptionValue(FLINK_CONF_DIR.getOpt())+"/";
+			File confFile = new File(confDirPath+CONFIG_FILE_NAME);
+			if(!confFile.exists()) {
+				LOG.fatal("Unable to locate configuration file in "+confFile);
+				System.exit(1);
+			}
+			confPath = new Path(confFile.getAbsolutePath());
+		} else {
+			System.out.println("No configuration file has been specified");
+			
+			// no configuration path given.
+			// -> see if there is one in the current directory
+			File currDir = new File(".");
+			File[] candidates = currDir.listFiles(new FilenameFilter() {
+				@Override
+				public boolean accept(final File dir, final String name) {
+					return name != null && name.endsWith(".yaml");
+				}
+			});
+			if(candidates == null || candidates.length == 0) {
+				System.out.println("No configuration file has been found in current directory.\n"
+						+ "Copying default.");
+				File outFile = generateDefaultConf(localJarPath);
+				confPath = new Path(outFile.toURI());
+			} else {
+				if(candidates.length > 1) {
+					System.out.println("Multiple .yaml configuration files were found in the current directory\n"
+							+ "Please specify one explicitly");
+					System.exit(1);
+				} else if(candidates.length == 1) {
+					confPath = new Path(candidates[0].toURI());
+				} 
+			}
+		}
+		List<File> shipFiles = new ArrayList<File>();
+		// path to directory to ship
+		if(cmd.hasOption(SHIP_PATH.getOpt())) {
+			String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
+			File shipDir = new File(shipPath);
+			if(shipDir.isDirectory()) {
+				shipFiles = new ArrayList<File>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
+					@Override
+					public boolean accept(File dir, String name) {
+						return !(name.equals(".") || name.equals("..") );
+					}
+				})));
+			} else {
+				LOG.warn("Ship directory is not a directory!");
+			}
+		}
+		boolean hasLog4j = false;
+		//check if there is a log4j file
+		if(confDirPath.length() > 0) {
+			File l4j = new File(confDirPath+"/log4j.properties");
+			if(l4j.exists()) {
+				shipFiles.add(l4j);
+				hasLog4j = true;
+			}
+		}
+		
+		// queue
+		String queue = "default";
+		if(cmd.hasOption(QUEUE.getOpt())) {
+			queue = cmd.getOptionValue(QUEUE.getOpt());
+		}
+		
+		// JobManager Memory
+		int jmMemory = 512;
+		if(cmd.hasOption(JM_MEMORY.getOpt())) {
+			jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
+		}
+		
+		// Task Managers memory
+		int tmMemory = 1024;
+		if(cmd.hasOption(TM_MEMORY.getOpt())) {
+			tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
+		}
+		
+		// Task Managers vcores
+		int tmCores = 1;
+		if(cmd.hasOption(TM_CORES.getOpt())) {
+			tmCores = Integer.valueOf(cmd.getOptionValue(TM_CORES.getOpt()));
+		}
+		Utils.getFlinkConfiguration(confPath.toUri().getPath());
+		int jmPort = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0);
+		if(jmPort == 0) {
+			LOG.warn("Unable to find job manager port in configuration!");
+			jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
+		}
+		conf = Utils.initializeYarnConfiguration();
+		
+		// intialize HDFS
+		LOG.info("Copy App Master jar from local filesystem and add to local environment");
+		// Copy the application master jar to the filesystem 
+		// Create a local resource to point to the destination jar path 
+		final FileSystem fs = FileSystem.get(conf);
+		
+		if(fs.getScheme().startsWith("file")) {
+			LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+					+ "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
+					+ "The Flink YARN client needs to store its files in a distributed file system");
+		}
+		
+		// Create yarnClient
+		final YarnClient yarnClient = YarnClient.createYarnClient();
+		yarnClient.init(conf);
+		yarnClient.start();
+		
+		// Query cluster for metrics
+		if(cmd.hasOption(QUERY.getOpt())) {
+			showClusterMetrics(yarnClient);
+		}
+		if(!cmd.hasOption(CONTAINER.getOpt())) {
+			LOG.fatal("Missing required argument "+CONTAINER.getOpt());
+			printUsage();
+			yarnClient.stop();
+			System.exit(1);
+		}
+		
+		// TM Count
+		final int taskManagerCount = Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt()));
+		
+		System.out.println("Using values:");
+		System.out.println("\tContainer Count = "+taskManagerCount);
+		System.out.println("\tJar Path = "+localJarPath.toUri().getPath());
+		System.out.println("\tConfiguration file = "+confPath.toUri().getPath());
+		System.out.println("\tJobManager memory = "+jmMemory);
+		System.out.println("\tTaskManager memory = "+tmMemory);
+		System.out.println("\tTaskManager cores = "+tmCores);
+
+		// Create application via yarnClient
+		YarnClientApplication app = yarnClient.createApplication();
+		GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
+		Resource maxRes = appResponse.getMaximumResourceCapability();
+		if(tmMemory > maxRes.getMemory() || tmCores > maxRes.getVirtualCores()) {
+			LOG.fatal("The cluster does not have the requested resources for the TaskManagers available!\n"
+					+ "Maximum Memory: "+maxRes.getMemory() +", Maximum Cores: "+tmCores);
+			yarnClient.stop();
+			System.exit(1);
+		}
+		if(jmMemory > maxRes.getMemory() ) {
+			LOG.fatal("The cluster does not have the requested resources for the JobManager available!\n"
+					+ "Maximum Memory: "+maxRes.getMemory());
+			yarnClient.stop();
+			System.exit(1);
+		}
+		int totalMemoryRequired = jmMemory + tmMemory * taskManagerCount;
+		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+			LOG.fatal("This YARN session requires "+totalMemoryRequired+"MB of memory in the cluster. "
+					+ "There are currently only "+freeClusterMem.totalFreeMemory+"MB available.");
+			yarnClient.stop();
+			System.exit(1);
+		}
+		if( tmMemory > freeClusterMem.containerLimit) {
+			LOG.fatal("The requested amount of memory for the TaskManagers ("+tmMemory+"MB) is more than "
+					+ "the largest possible YARN container: "+freeClusterMem.containerLimit);
+			yarnClient.stop();
+			System.exit(1);
+		}
+		if( jmMemory > freeClusterMem.containerLimit) {
+			LOG.fatal("The requested amount of memory for the JobManager ("+jmMemory+"MB) is more than "
+					+ "the largest possible YARN container: "+freeClusterMem.containerLimit);
+			yarnClient.stop();
+			System.exit(1);
+		}
+		
+		// respect custom JVM options in the YAML file
+		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+		
+		// Set up the container launch context for the application master
+		ContainerLaunchContext amContainer = Records
+				.newRecord(ContainerLaunchContext.class);
+		
+		String amCommand = "$JAVA_HOME/bin/java"
+					+ " -Xmx"+Utils.calculateHeapSize(jmMemory)+"M " +javaOpts;
+		if(hasLog4j) {
+			amCommand 	+= " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
+		}
+		amCommand 	+= " org.apache.flink.yarn.ApplicationMaster" + " "
+					+ " 1>"
+					+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
+					+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
+		amContainer.setCommands(Collections.singletonList(amCommand));
+		
+		System.err.println("amCommand="+amCommand);
+		
+		// Set-up ApplicationSubmissionContext for the application
+		ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+		final ApplicationId appId = appContext.getApplicationId();
+		
+		// Setup jar for ApplicationMaster
+		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
+		LocalResource flinkConf = Records.newRecord(LocalResource.class);
+		Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), localJarPath, appMasterJar, fs.getHomeDirectory());
+		Path remotePathConf = Utils.setupLocalResource(conf, fs, appId.toString(), confPath, flinkConf, fs.getHomeDirectory());
+		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
+		localResources.put("flink.jar", appMasterJar);
+		localResources.put("flink-conf.yaml", flinkConf);
+		
+		
+		// setup security tokens (code from apache storm)
+		final Path[] paths = new Path[3 + shipFiles.size()];
+		StringBuffer envShipFileList = new StringBuffer();
+		// upload ship files
+		for (int i = 0; i < shipFiles.size(); i++) {
+			File shipFile = shipFiles.get(i);
+			LocalResource shipResources = Records.newRecord(LocalResource.class);
+			Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
+			paths[3 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
+					shipLocalPath, shipResources, fs.getHomeDirectory());
+			localResources.put(shipFile.getName(), shipResources);
+			
+			envShipFileList.append(paths[3 + i]);
+			if(i+1 < shipFiles.size()) {
+				envShipFileList.append(',');
+			}
+		}
+
+		paths[0] = remotePathJar;
+		paths[1] = remotePathConf;
+		paths[2] = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
+		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+		fs.setPermission(paths[2], permission); // set permission for path.
+		Utils.setTokensFor(amContainer, paths, this.conf);
+		
+		 
+		amContainer.setLocalResources(localResources);
+		fs.close();
+
+		// Setup CLASSPATH for ApplicationMaster
+		Map<String, String> appMasterEnv = new HashMap<String, String>();
+		Utils.setupEnv(conf, appMasterEnv);
+		// set configuration values
+		appMasterEnv.put(Client.ENV_TM_COUNT, String.valueOf(taskManagerCount));
+		appMasterEnv.put(Client.ENV_TM_CORES, String.valueOf(tmCores));
+		appMasterEnv.put(Client.ENV_TM_MEMORY, String.valueOf(tmMemory));
+		appMasterEnv.put(Client.FLINK_JAR_PATH, remotePathJar.toString() );
+		appMasterEnv.put(Client.ENV_APP_ID, appId.toString());
+		appMasterEnv.put(Client.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
+		appMasterEnv.put(Client.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
+		appMasterEnv.put(Client.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
+		
+		amContainer.setEnvironment(appMasterEnv);
+		
+		// Set up resource type requirements for ApplicationMaster
+		Resource capability = Records.newRecord(Resource.class);
+		capability.setMemory(jmMemory);
+		capability.setVirtualCores(1);
+		
+		appContext.setApplicationName("Flink"); // application name
+		appContext.setAMContainerSpec(amContainer);
+		appContext.setResource(capability);
+		appContext.setQueue(queue);
+		
+		// file that we write into the conf/ dir containing the jobManager address.
+		final File addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE);
+		
+		Runtime.getRuntime().addShutdownHook(new Thread() {
+		@Override
+		public void run() {
+			try {
+				LOG.info("Killing the Flink-YARN application.");
+				yarnClient.killApplication(appId);
+				LOG.info("Deleting files in "+paths[2]);
+				FileSystem shutFS = FileSystem.get(conf);
+				shutFS.delete(paths[2], true); // delete conf and jar file.
+				shutFS.close();
+			} catch (Exception e) {
+				LOG.warn("Exception while killing the YARN application", e);
+			}
+			try {
+				addrFile.delete();
+			} catch (Exception e) {
+				LOG.warn("Exception while deleting the jobmanager address file", e);
+			}
+			LOG.info("YARN Client is shutting down");
+			yarnClient.stop();
+		}
+		});
+		
+		LOG.info("Submitting application master " + appId);
+		yarnClient.submitApplication(appContext);
+		ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+		YarnApplicationState appState = appReport.getYarnApplicationState();
+		boolean told = false;
+		char[] el = { '/', '|', '\\', '-'};
+		int i = 0; 
+		while (appState != YarnApplicationState.FINISHED
+				&& appState != YarnApplicationState.KILLED
+				&& appState != YarnApplicationState.FAILED) {
+			if(!told && appState ==  YarnApplicationState.RUNNING) {
+				System.err.println("Flink JobManager is now running on "+appReport.getHost()+":"+jmPort);
+				System.err.println("JobManager Web Interface: "+appReport.getTrackingUrl());
+				// write jobmanager connect information
+				
+				PrintWriter out = new PrintWriter(addrFile);
+				out.println(appReport.getHost()+":"+jmPort);
+				out.close();
+				addrFile.setReadable(true, false); // readable for all.
+				told = true;
+			}
+			if(!told) {
+				System.err.print(el[i++]+"\r");
+				if(i == el.length) {
+					i = 0;
+				}
+				Thread.sleep(500); // wait for the application to switch to RUNNING
+			} else {
+				Thread.sleep(5000);
+			}
+			
+			appReport = yarnClient.getApplicationReport(appId);
+			appState = appReport.getYarnApplicationState();
+		}
+
+		LOG.info("Application " + appId + " finished with"
+				+ " state " + appState + " at " + appReport.getFinishTime());
+		if(appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED ) {
+			LOG.warn("Application failed. Diagnostics "+appReport.getDiagnostics());
+		}
+		
+	}
+	private static class ClusterResourceDescription {
+		public int totalFreeMemory;
+		public int containerLimit;
+	}
+	private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
+		ClusterResourceDescription crd = new ClusterResourceDescription();
+		crd.totalFreeMemory = 0;
+		crd.containerLimit = 0;
+		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+		for(NodeReport rep : nodes) {
+			int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
+			crd.totalFreeMemory += free;
+			if(free > crd.containerLimit) {
+				crd.containerLimit = free;
+			}
+		}
+		return crd;
+	}
+
+	private void printUsage() {
+		System.out.println("Usage:");
+		HelpFormatter formatter = new HelpFormatter();
+		formatter.setWidth(200);
+		formatter.setLeftPadding(5);
+		formatter.setSyntaxPrefix("   Required");
+		Options req = new Options();
+		req.addOption(CONTAINER);
+		formatter.printHelp(" ", req);
+		
+		formatter.setSyntaxPrefix("   Optional");
+		Options opt = new Options();
+		opt.addOption(VERBOSE);
+	//	opt.addOption(GEN_CONF);
+	//	opt.addOption(STRATOSPHERE_CONF);
+	//	opt.addOption(STRATOSPHERE_JAR);
+		opt.addOption(JM_MEMORY);
+		opt.addOption(TM_MEMORY);
+		opt.addOption(TM_CORES);
+		opt.addOption(QUERY);
+		opt.addOption(QUEUE);
+		formatter.printHelp(" ", opt);
+	}
+
+	private void showClusterMetrics(YarnClient yarnClient)
+			throws YarnException, IOException {
+		YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
+		System.out.println("NodeManagers in the Cluster " + metrics.getNumNodeManagers());
+		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+		final String format = "|%-16s |%-16s %n";
+		System.out.printf("|Property         |Value          %n");
+		System.out.println("+---------------------------------------+");
+		int totalMemory = 0;
+		int totalCores = 0;
+		for(NodeReport rep : nodes) {
+			final Resource res = rep.getCapability();
+			totalMemory += res.getMemory();
+			totalCores += res.getVirtualCores();
+			System.out.format(format, "NodeID", rep.getNodeId());
+			System.out.format(format, "Memory", res.getMemory()+" MB");
+			System.out.format(format, "vCores", res.getVirtualCores());
+			System.out.format(format, "HealthReport", rep.getHealthReport());
+			System.out.format(format, "Containers", rep.getNumContainers());
+			System.out.println("+---------------------------------------+");
+		}
+		System.out.println("Summary: totalMemory "+totalMemory+" totalCores "+totalCores);
+		List<QueueInfo> qInfo = yarnClient.getAllQueues();
+		for(QueueInfo q : qInfo) {
+			System.out.println("Queue: "+q.getQueueName()+", Current Capacity: "+q.getCurrentCapacity()+" Max Capacity: "+q.getMaximumCapacity()+" Applications: "+q.getApplications().size());
+		}
+		yarnClient.stop();
+		System.exit(0);
+	}
+
+	private File generateDefaultConf(Path localJarPath) throws IOException,
+			FileNotFoundException {
+		JarFile jar = null;
+		try {
+			jar = new JarFile(localJarPath.toUri().getPath());
+		} catch(FileNotFoundException fne) {
+			LOG.fatal("Unable to access jar file. Specify jar file or configuration file.", fne);
+			System.exit(1);
+		}
+		InputStream confStream = jar.getInputStream(jar.getEntry("flink-conf.yaml"));
+		
+		if(confStream == null) {
+			LOG.warn("Given jar file does not contain yaml conf.");
+			confStream = this.getClass().getResourceAsStream("flink-conf.yaml"); 
+			if(confStream == null) {
+				throw new RuntimeException("Unable to find flink-conf in jar file");
+			}
+		}
+		File outFile = new File("flink-conf.yaml");
+		if(outFile.exists()) {
+			throw new RuntimeException("File unexpectedly exists");
+		}
+		FileOutputStream outputStream = new FileOutputStream(outFile);
+		int read = 0;
+		byte[] bytes = new byte[1024];
+		while ((read = confStream.read(bytes)) != -1) {
+			outputStream.write(bytes, 0, read);
+		}
+		confStream.close(); outputStream.close(); jar.close();
+		return outFile;
+	}
+
+	public static void main(String[] args) throws Exception {
+		Client c = new Client();
+		c.run(args);
+	}
+}