You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/12 14:48:43 UTC
[64/73] [abbrv] prefix all projects in addons and quickstarts with
flink-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
new file mode 100644
index 0000000..3a58afc
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.aggregators.AggregatorRegistry;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.record.functions.CoGroupFunction;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsFirst;
+import org.apache.flink.api.java.record.operators.CoGroupOperator;
+import org.apache.flink.api.java.record.operators.DeltaIteration;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.ReflectionUtil;
+
+public class SpargelIteration {
+
+ private static final String DEFAULT_NAME = "<unnamed vertex-centric iteration>";
+
+ private final DeltaIteration iteration;
+
+ private final Class<? extends Key<?>> vertexKey;
+ private final Class<? extends Value> vertexValue;
+ private final Class<? extends Value> messageType;
+ private final Class<? extends Value> edgeValue;
+
+ private final CoGroupOperator vertexUpdater;
+ private final CoGroupOperator messager;
+
+
+ // ----------------------------------------------------------------------------------
+
+ public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value>
+ SpargelIteration(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+ VertexUpdateFunction<VertexKey, VertexValue, Message> uf)
+ {
+ this(mf, uf, DEFAULT_NAME);
+ }
+
+ public <VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value, EdgeValue extends Value> SpargelIteration(
+ MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
+ String name)
+ {
+ // get the types
+ this.vertexKey = ReflectionUtil.getTemplateType1(mf.getClass());
+ this.vertexValue = ReflectionUtil.getTemplateType2(mf.getClass());
+ this.messageType = ReflectionUtil.getTemplateType3(mf.getClass());
+ this.edgeValue = ReflectionUtil.getTemplateType4(mf.getClass());
+
+ if (vertexKey == null || vertexValue == null || messageType == null || edgeValue == null) {
+ throw new RuntimeException();
+ }
+
+ // instantiate the data flow
+ this.iteration = new DeltaIteration(0, name);
+
+ this.messager = CoGroupOperator.builder(MessagingDriver.class, vertexKey, 0, 0)
+ .input2(iteration.getWorkset())
+ .name("Message Sender")
+ .build();
+ this.vertexUpdater = CoGroupOperator.builder(VertexUpdateDriver.class, vertexKey, 0, 0)
+ .input1(messager)
+ .input2(iteration.getSolutionSet())
+ .name("Vertex Updater")
+ .build();
+
+ iteration.setNextWorkset(vertexUpdater);
+ iteration.setSolutionSetDelta(vertexUpdater);
+
+ // parameterize the data flow
+ try {
+ Configuration vertexUdfParams = vertexUpdater.getParameters();
+ InstantiationUtil.writeObjectToConfig(uf, vertexUdfParams, VertexUpdateDriver.UDF_PARAM);
+ vertexUdfParams.setClass(VertexUpdateDriver.KEY_PARAM, vertexKey);
+ vertexUdfParams.setClass(VertexUpdateDriver.VALUE_PARAM, vertexValue);
+ vertexUdfParams.setClass(VertexUpdateDriver.MESSAGE_PARAM, messageType);
+
+ Configuration messageUdfParams = messager.getParameters();
+ InstantiationUtil.writeObjectToConfig(mf, messageUdfParams, MessagingDriver.UDF_PARAM);
+ messageUdfParams.setClass(MessagingDriver.KEY_PARAM, vertexKey);
+ messageUdfParams.setClass(MessagingDriver.VALUE_PARAM, vertexValue);
+ messageUdfParams.setClass(MessagingDriver.MESSAGE_PARAM, messageType);
+ messageUdfParams.setClass(MessagingDriver.EDGE_PARAM, edgeValue);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Could not serialize the UDFs for distribution" +
+ (e.getMessage() == null ? '.' : ": " + e.getMessage()), e);
+ }
+ }
+
+ // ----------------------------------------------------------------------------------
+ // inputs and outputs
+ // ----------------------------------------------------------------------------------
+
+ public void setVertexInput(Operator<Record> c) {
+ this.iteration.setInitialSolutionSet(c);
+ this.iteration.setInitialWorkset(c);
+ }
+
+ public void setEdgesInput(Operator<Record> c) {
+ this.messager.setFirstInput(c);
+ }
+
+ public Operator<?> getOutput() {
+ return this.iteration;
+ }
+
+ public void setDegreeOfParallelism(int dop) {
+ this.iteration.setDegreeOfParallelism(dop);
+ }
+
+ public void setNumberOfIterations(int iterations) {
+ this.iteration.setMaximumNumberOfIterations(iterations);
+ }
+
+ public AggregatorRegistry getAggregators() {
+ return this.iteration.getAggregators();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Wrapping UDFs
+ // --------------------------------------------------------------------------------------------
+
+ @ConstantFieldsFirst(0)
+ public static final class VertexUpdateDriver<K extends Key<K>, V extends Value, M extends Value> extends CoGroupFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String UDF_PARAM = "spargel.udf";
+ private static final String KEY_PARAM = "spargel.key-type";
+ private static final String VALUE_PARAM = "spargel.value-type";
+ private static final String MESSAGE_PARAM = "spargel.message-type";
+
+ private VertexUpdateFunction<K, V, M> vertexUpdateFunction;
+
+ private K vertexKey;
+ private V vertexValue;
+ private MessageIterator<M> messageIter;
+
+ @Override
+ public void coGroup(Iterator<Record> messages, Iterator<Record> vertex, Collector<Record> out) throws Exception {
+
+ if (vertex.hasNext()) {
+ Record first = vertex.next();
+ first.getFieldInto(0, vertexKey);
+ first.getFieldInto(1, vertexValue);
+ messageIter.setSource(messages);
+ vertexUpdateFunction.setOutput(first, out);
+ vertexUpdateFunction.updateVertex(vertexKey, vertexValue, messageIter);
+ } else {
+ if (messages.hasNext()) {
+ String message = "Target vertex does not exist!.";
+ try {
+ Record next = messages.next();
+ next.getFieldInto(0, vertexKey);
+ message = "Target vertex '" + vertexKey + "' does not exist!.";
+ } catch (Throwable t) {}
+ throw new Exception(message);
+ } else {
+ throw new Exception();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // instantiate only the first time
+ if (vertexUpdateFunction == null) {
+ Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, Key.class);
+ Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, Value.class);
+ Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class);
+
+ vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
+ vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
+ messageIter = new MessageIterator<M>(InstantiationUtil.instantiate(messageClass, Value.class));
+
+ try {
+ this.vertexUpdateFunction = (VertexUpdateFunction<K, V, M>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, parameters.getClassLoader());
+ } catch (Exception e) {
+ String message = e.getMessage() == null ? "." : ": " + e.getMessage();
+ throw new Exception("Could not instantiate VertexUpdateFunction" + message, e);
+ }
+
+ this.vertexUpdateFunction.init(getIterationRuntimeContext());
+ this.vertexUpdateFunction.setup(parameters);
+ }
+ this.vertexUpdateFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.vertexUpdateFunction.postSuperstep();
+ }
+ }
+
+ public static final class MessagingDriver<K extends Key<K>, V extends Value, M extends Value, E extends Value> extends CoGroupFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String UDF_PARAM = "spargel.udf";
+ private static final String KEY_PARAM = "spargel.key-type";
+ private static final String VALUE_PARAM = "spargel.value-type";
+ private static final String MESSAGE_PARAM = "spargel.message-type";
+ private static final String EDGE_PARAM = "spargel.edge-value";
+
+
+ private MessagingFunction<K, V, M, E> messagingFunction;
+
+ private K vertexKey;
+ private V vertexValue;
+
+ @Override
+ public void coGroup(Iterator<Record> edges, Iterator<Record> state, Collector<Record> out) throws Exception {
+ if (state.hasNext()) {
+ Record first = state.next();
+ first.getFieldInto(0, vertexKey);
+ first.getFieldInto(1, vertexValue);
+ messagingFunction.set(edges, out);
+ messagingFunction.sendMessages(vertexKey, vertexValue);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // instantiate only the first time
+ if (messagingFunction == null) {
+ Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, Key.class);
+ Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, Value.class);
+// Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class);
+ Class<E> edgeClass = parameters.getClass(EDGE_PARAM, null, Value.class);
+
+ vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
+ vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
+
+ K edgeKeyHolder = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
+ E edgeValueHolder = InstantiationUtil.instantiate(edgeClass, Value.class);
+
+ try {
+ this.messagingFunction = (MessagingFunction<K, V, M, E>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, parameters.getClassLoader());
+ } catch (Exception e) {
+ String message = e.getMessage() == null ? "." : ": " + e.getMessage();
+ throw new Exception("Could not instantiate MessagingFunction" + message, e);
+ }
+
+ this.messagingFunction.init(getIterationRuntimeContext(), edgeKeyHolder, edgeValueHolder);
+ this.messagingFunction.setup(parameters);
+ }
+ this.messagingFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.messagingFunction.postSuperstep();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
new file mode 100644
index 0000000..37e32cd
--- /dev/null
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/VertexUpdateFunction.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.spargel.java.record;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+/**
+ *
+ * <VertexKey> The vertex key type.
+ * <VertexValue> The vertex value type.
+ * <Message> The message type.
+ */
+public abstract class VertexUpdateFunction<VertexKey extends Key<VertexKey>, VertexValue extends Value, Message extends Value> implements Serializable {
+
+ // --------------------------------------------------------------------------------------------
+ // Public API Methods
+ // --------------------------------------------------------------------------------------------
+
+ public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
+
+ public void setup(Configuration config) throws Exception {}
+
+ public void preSuperstep() throws Exception {}
+
+ public void postSuperstep() throws Exception {}
+
+ public void setNewVertexValue(VertexValue newValue) {
+ outVal.setField(1, newValue);
+ out.collect(outVal);
+ }
+
+ public int getSuperstep() {
+ return this.runtimeContext.getSuperstepNumber();
+ }
+
+ public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+ return this.runtimeContext.<T>getIterationAggregator(name);
+ }
+
+ public <T extends Value> T getPreviousIterationAggregate(String name) {
+ return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // internal methods
+ // --------------------------------------------------------------------------------------------
+
+ private IterationRuntimeContext runtimeContext;
+
+ private Collector<Record> out;
+
+ private Record outVal;
+
+
+ void init(IterationRuntimeContext context) {
+ this.runtimeContext = context;
+ }
+
+ void setOutput(Record val, Collector<Record> out) {
+ this.out = out;
+ this.outVal = val;
+ }
+
+ // serializability
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
new file mode 100644
index 0000000..678b5e1
--- /dev/null
+++ b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.spargel.java;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.PlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner;
+import org.apache.flink.test.compiler.util.CompilerTestBase;
+
+
+public class SpargelCompilerTest extends CompilerTestBase {
+
+ @Test
+ public void testSpargelCompiler() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ // compose test program
+ {
+ DataSet<Long> vertexIds = env.generateSequence(1, 2);
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+ DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
+
+ result.print();
+ }
+
+ Plan p = env.createProgramPlan("Spargel Connected Components");
+ OptimizedPlan op = compileNoStats(p);
+
+ // check the sink
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+ assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism());
+
+ // check the iteration
+ WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism());
+
+ // check the solution set join and the delta
+ PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+ assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+
+ DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+ assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism());
+ assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+ assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+
+ // check the workset set join
+ DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+ assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism());
+ assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+ assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+
+ assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+
+ // check that the initial partitioning is pushed out of the loop
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+ assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+ assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+
+ // check that the initial workset sort is outside the loop
+ assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
+ assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSpargelCompilerWithBroadcastVariable() {
+ try {
+ final String BC_VAR_NAME = "borat variable";
+
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+ // compose test program
+ {
+ DataSet<Long> bcVar = env.fromElements(1L);
+
+ DataSet<Long> vertexIds = env.generateSequence(1, 2);
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<Long, Long>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L));
+
+ DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+
+ VertexCentricIteration<Long, Long, Long, ?> vcIter = VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100);
+ vcIter.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
+ vcIter.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
+
+ DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(vcIter);
+
+ result.print();
+ }
+
+ Plan p = env.createProgramPlan("Spargel Connected Components");
+ OptimizedPlan op = compileNoStats(p);
+
+ // check the sink
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+ assertEquals(DEFAULT_PARALLELISM, sink.getDegreeOfParallelism());
+
+ // check the iteration
+ WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+ assertEquals(DEFAULT_PARALLELISM, iteration.getDegreeOfParallelism());
+
+ // check the solution set join and the delta
+ PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+ assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+
+ DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+ assertEquals(DEFAULT_PARALLELISM, ssJoin.getDegreeOfParallelism());
+ assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+ assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+
+ // check the workset set join
+ DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+ assertEquals(DEFAULT_PARALLELISM, edgeJoin.getDegreeOfParallelism());
+ assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+ assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+
+ assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+
+ // check that the initial partitioning is pushed out of the loop
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+ assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+ assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
new file mode 100644
index 0000000..e862e7c
--- /dev/null
+++ b/flink-addons/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.spargel.java;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.DeltaIteration;
+import org.apache.flink.api.java.DeltaIterationResultSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.spargel.java.MessageIterator;
+import org.apache.flink.spargel.java.MessagingFunction;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.VertexUpdateFunction;
+
+@SuppressWarnings("serial")
+public class SpargelTranslationTest {
+
+ @Test
+ public void testTranslationPlainEdges() {
+ try {
+ final String ITERATION_NAME = "Test Name";
+
+ final String AGGREGATOR_NAME = "AggregatorName";
+
+ final String BC_SET_MESSAGES_NAME = "borat messages";
+
+ final String BC_SET_UPDATES_NAME = "borat updates";
+ ;
+ final int NUM_ITERATIONS = 13;
+
+ final int ITERATION_DOP = 77;
+
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> bcMessaging = env.fromElements(1L);
+ DataSet<Long> bcUpdate = env.fromElements(1L);
+
+ DataSet<Tuple2<String, Double>> result;
+
+ // ------------ construct the test program ------------------
+ {
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+
+
+ VertexCentricIteration<String, Double, Long, ?> vertexIteration =
+ VertexCentricIteration.withPlainEdges(edges, new UpdateFunction(), new MessageFunctionNoEdgeValue(), NUM_ITERATIONS);
+ vertexIteration.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcMessaging);
+ vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
+
+ vertexIteration.setName(ITERATION_NAME);
+ vertexIteration.setParallelism(ITERATION_DOP);
+
+ vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+
+ result = initialVertices.runOperation(vertexIteration);
+ }
+
+
+ // ------------- validate the java program ----------------
+
+ assertTrue(result instanceof DeltaIterationResultSet);
+
+ DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+ DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+
+ // check the basic iteration properties
+ assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+ assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+ assertEquals(ITERATION_DOP, iteration.getParallelism());
+ assertEquals(ITERATION_NAME, iteration.getName());
+
+ assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+
+ // validate that the semantic properties are set as they should
+ TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+ assertTrue(solutionSetJoin.getSematicProperties().getForwardedField1(0).contains(0));
+ assertTrue(solutionSetJoin.getSematicProperties().getForwardedField2(0).contains(0));
+
+ TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+
+ // validate that the broadcast sets are forwarded
+ assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+ assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTranslationPlainEdgesWithForkedBroadcastVariable() {
+ try {
+ final String ITERATION_NAME = "Test Name";
+
+ final String AGGREGATOR_NAME = "AggregatorName";
+
+ final String BC_SET_MESSAGES_NAME = "borat messages";
+
+ final String BC_SET_UPDATES_NAME = "borat updates";
+ ;
+ final int NUM_ITERATIONS = 13;
+
+ final int ITERATION_DOP = 77;
+
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> bcVar = env.fromElements(1L);
+
+ DataSet<Tuple2<String, Double>> result;
+
+ // ------------ construct the test program ------------------
+ {
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
+
+
+ VertexCentricIteration<String, Double, Long, ?> vertexIteration =
+ VertexCentricIteration.withPlainEdges(edges, new UpdateFunction(), new MessageFunctionNoEdgeValue(), NUM_ITERATIONS);
+ vertexIteration.addBroadcastSetForMessagingFunction(BC_SET_MESSAGES_NAME, bcVar);
+ vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
+
+ vertexIteration.setName(ITERATION_NAME);
+ vertexIteration.setParallelism(ITERATION_DOP);
+
+ vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+
+ result = initialVertices.runOperation(vertexIteration);
+ }
+
+
+ // ------------- validate the java program ----------------
+
+ assertTrue(result instanceof DeltaIterationResultSet);
+
+ DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+ DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+
+ // check the basic iteration properties
+ assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+ assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+ assertEquals(ITERATION_DOP, iteration.getParallelism());
+ assertEquals(ITERATION_NAME, iteration.getName());
+
+ assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+
+ // validate that the semantic properties are set as they should
+ TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+ assertTrue(solutionSetJoin.getSematicProperties().getForwardedField1(0).contains(0));
+ assertTrue(solutionSetJoin.getSematicProperties().getForwardedField2(0).contains(0));
+
+ TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+
+ // validate that the broadcast sets are forwarded
+ assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+ assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static class UpdateFunction extends VertexUpdateFunction<String, Double, Long> {
+
+ @Override
+ public void updateVertex(String vertexKey, Double vertexValue, MessageIterator<Long> inMessages) {}
+ }
+
+ public static class MessageFunctionNoEdgeValue extends MessagingFunction<String, Double, Long, Object> {
+
+ @Override
+ public void sendMessages(String vertexKey, Double vertexValue) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
new file mode 100644
index 0000000..a34f2db
--- /dev/null
+++ b/flink-addons/flink-spargel/src/test/java/org/apache/flink/test/spargel/SpargelConnectedComponentsITCase.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.spargel;
+
+import java.io.BufferedReader;
+
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.spargel.java.VertexCentricIteration;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater;
+import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class SpargelConnectedComponentsITCase extends JavaProgramTestBase {
+
+ private static final long SEED = 9487520347802987L;
+
+ private static final int NUM_VERTICES = 1000;
+
+ private static final int NUM_EDGES = 10000;
+
+ private String resultPath;
+
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempFilePath("results");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
+ DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
+
+ DataSet<Tuple2<Long, Long>> edges = edgeString.map(new EdgeParser());
+
+ DataSet<Tuple2<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+ DataSet<Tuple2<Long, Long>> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100));
+
+ result.writeAsCsv(resultPath, "\n", " ");
+ env.execute("Spargel Connected Components");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ for (BufferedReader reader : getResultReader(resultPath)) {
+ ConnectedComponentsData.checkOddEvenResult(reader);
+ }
+ }
+
+ public static final class EdgeParser extends MapFunction<String, Tuple2<Long, Long>> {
+ public Tuple2<Long, Long> map(String value) {
+ String[] nums = value.split(" ");
+ return new Tuple2<Long, Long>(Long.parseLong(nums[0]), Long.parseLong(nums[1]));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/pom.xml b/flink-addons/flink-yarn/pom.xml
new file mode 100644
index 0000000..6c2d130
--- /dev/null
+++ b/flink-addons/flink-yarn/pom.xml
@@ -0,0 +1,60 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-addons</artifactId>
+ <version>0.6-incubating-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-yarn</artifactId>
+ <name>flink-yarn</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>hadoop-core</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
new file mode 100644
index 0000000..40635dc
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.flink.yarn;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.base.Preconditions;
+
+public class ApplicationMaster {
+
+ private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
+
+ private void run() throws Exception {
+ //Utils.logFilesInCurrentDirectory(LOG);
+ // Initialize clients to ResourceManager and NodeManagers
+ Configuration conf = Utils.initializeYarnConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ Map<String, String> envs = System.getenv();
+ final String currDir = envs.get(Environment.PWD.key());
+ final String logDirs = envs.get(Environment.LOG_DIRS.key());
+ final String ownHostname = envs.get(Environment.NM_HOST.key());
+ final String appId = envs.get(Client.ENV_APP_ID);
+ final String clientHomeDir = envs.get(Client.ENV_CLIENT_HOME_DIR);
+ final String applicationMasterHost = envs.get(Environment.NM_HOST.key());
+ final String remoteFlinkJarPath = envs.get(Client.FLINK_JAR_PATH);
+ final String shipListString = envs.get(Client.ENV_CLIENT_SHIP_FILES);
+ final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
+ final int taskManagerCount = Integer.valueOf(envs.get(Client.ENV_TM_COUNT));
+ final int memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
+ final int coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
+
+ int heapLimit = Utils.calculateHeapSize(memoryPerTaskManager);
+
+ if(currDir == null) {
+ throw new RuntimeException("Current directory unknown");
+ }
+ if(ownHostname == null) {
+ throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
+ }
+ LOG.info("Working directory "+currDir);
+
+ // load Flink configuration.
+ Utils.getFlinkConfiguration(currDir);
+
+ final String localWebInterfaceDir = currDir+"/resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME;
+
+ // Update yaml conf -> set jobManager address to this machine's address.
+ FileInputStream fis = new FileInputStream(currDir+"/flink-conf.yaml");
+ BufferedReader br = new BufferedReader(new InputStreamReader(fis));
+ Writer output = new BufferedWriter(new FileWriter(currDir+"/flink-conf-modified.yaml"));
+ String line ;
+ while ( (line = br.readLine()) != null) {
+ if(line.contains(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY)) {
+ output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
+ } else if(line.contains(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY)) {
+ output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+"\n");
+ } else {
+ output.append(line+"\n");
+ }
+ }
+ // just to make sure.
+ output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
+ output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+localWebInterfaceDir+"\n");
+ output.append(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY+": "+logDirs+"\n");
+ output.close();
+ br.close();
+ File newConf = new File(currDir+"/flink-conf-modified.yaml");
+ if(!newConf.exists()) {
+ LOG.warn("modified yaml does not exist!");
+ }
+
+ Utils.copyJarContents("resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME,
+ ApplicationMaster.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+ JobManager jm;
+ {
+ String pathToNepheleConfig = currDir+"/flink-conf-modified.yaml";
+ String[] args = {"-executionMode","cluster", "-configDir", pathToNepheleConfig};
+
+ // start the job manager
+ jm = JobManager.initialize( args );
+
+ // Start info server for jobmanager
+ jm.startInfoServer();
+ }
+
+ AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
+ rmClient.init(conf);
+ rmClient.start();
+
+ NMClient nmClient = NMClient.createNMClient();
+ nmClient.init(conf);
+ nmClient.start();
+
+ // Register with ResourceManager
+ LOG.info("registering ApplicationMaster");
+ rmClient.registerApplicationMaster(applicationMasterHost, 0, "http://"+applicationMasterHost+":"+GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "undefined"));
+
+ // Priority for worker containers - priorities are intra-application
+ Priority priority = Records.newRecord(Priority.class);
+ priority.setPriority(0);
+
+ // Resource requirements for worker containers
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(memoryPerTaskManager);
+ capability.setVirtualCores(coresPerTaskManager);
+
+ // Make container requests to ResourceManager
+ for (int i = 0; i < taskManagerCount; ++i) {
+ ContainerRequest containerAsk = new ContainerRequest(capability,
+ null, null, priority);
+ LOG.info("Requesting TaskManager container " + i);
+ rmClient.addContainerRequest(containerAsk);
+ }
+
+ LocalResource flinkJar = Records.newRecord(LocalResource.class);
+ LocalResource flinkConf = Records.newRecord(LocalResource.class);
+
+ // register Flink Jar with remote HDFS
+ final Path remoteJarPath = new Path(remoteFlinkJarPath);
+ Utils.registerLocalResource(fs, remoteJarPath, flinkJar);
+
+ // register conf with local fs.
+ Path remoteConfPath = Utils.setupLocalResource(conf, fs, appId, new Path("file://"+currDir+"/flink-conf-modified.yaml"), flinkConf, new Path(clientHomeDir));
+ LOG.info("Prepared localresource for modified yaml: "+flinkConf);
+
+
+ boolean hasLog4j = new File(currDir+"/log4j.properties").exists();
+ // prepare the files to ship
+ LocalResource[] remoteShipRsc = null;
+ String[] remoteShipPaths = shipListString.split(",");
+ if(!shipListString.isEmpty()) {
+ remoteShipRsc = new LocalResource[remoteShipPaths.length];
+ { // scope for i
+ int i = 0;
+ for(String remoteShipPathStr : remoteShipPaths) {
+ if(remoteShipPathStr == null || remoteShipPathStr.isEmpty()) {
+ continue;
+ }
+ remoteShipRsc[i] = Records.newRecord(LocalResource.class);
+ Path remoteShipPath = new Path(remoteShipPathStr);
+ Utils.registerLocalResource(fs, remoteShipPath, remoteShipRsc[i]);
+ i++;
+ }
+ }
+ }
+
+ // respect custom JVM options in the YAML file
+ final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+
+ // Obtain allocated containers and launch
+ int allocatedContainers = 0;
+ int completedContainers = 0;
+ while (allocatedContainers < taskManagerCount) {
+ AllocateResponse response = rmClient.allocate(0);
+ for (Container container : response.getAllocatedContainers()) {
+ LOG.info("Got new Container for TM "+container.getId()+" on host "+container.getNodeId().getHost());
+ ++allocatedContainers;
+
+ // Launch container by create ContainerLaunchContext
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+
+ String tmCommand = "$JAVA_HOME/bin/java -Xmx"+heapLimit+"m " + javaOpts ;
+ if(hasLog4j) {
+ tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
+ }
+ tmCommand += " org.apache.flink.yarn.YarnTaskManagerRunner -configDir . "
+ + " 1>"
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ + "/taskmanager-stdout.log"
+ + " 2>"
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ + "/taskmanager-stderr.log";
+ ctx.setCommands(Collections.singletonList(tmCommand));
+
+ LOG.info("Starting TM with command="+tmCommand);
+
+ // copy resources to the TaskManagers.
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
+ localResources.put("flink.jar", flinkJar);
+ localResources.put("flink-conf.yaml", flinkConf);
+
+ // add ship resources
+ if(!shipListString.isEmpty()) {
+ Preconditions.checkNotNull(remoteShipRsc);
+ for( int i = 0; i < remoteShipPaths.length; i++) {
+ localResources.put(new Path(remoteShipPaths[i]).getName(), remoteShipRsc[i]);
+ }
+ }
+
+
+ ctx.setLocalResources(localResources);
+
+ // Setup CLASSPATH for Container (=TaskTracker)
+ Map<String, String> containerEnv = new HashMap<String, String>();
+ Utils.setupEnv(conf, containerEnv); //add flink.jar to class path.
+ containerEnv.put(Client.ENV_CLIENT_USERNAME, yarnClientUsername);
+
+ ctx.setEnvironment(containerEnv);
+
+ UserGroupInformation user = UserGroupInformation.getCurrentUser();
+ try {
+ Credentials credentials = user.getCredentials();
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(),
+ 0, dob.getLength());
+ ctx.setTokens(securityTokens);
+ } catch (IOException e) {
+ LOG.warn("Getting current user info failed when trying to launch the container"
+ + e.getMessage());
+ }
+
+ LOG.info("Launching container " + allocatedContainers);
+ nmClient.startContainer(container, ctx);
+ }
+ for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+ ++completedContainers;
+ LOG.info("Completed container (while allocating) "+status.getContainerId()+". Total Completed:" + completedContainers);
+ LOG.info("Diagnostics "+status.getDiagnostics());
+ }
+ Thread.sleep(100);
+ }
+
+ // Now wait for containers to complete
+
+ while (completedContainers < taskManagerCount) {
+ AllocateResponse response = rmClient.allocate(completedContainers
+ / taskManagerCount);
+ for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+ ++completedContainers;
+ LOG.info("Completed container "+status.getContainerId()+". Total Completed:" + completedContainers);
+ LOG.info("Diagnostics "+status.getDiagnostics());
+ }
+ Thread.sleep(5000);
+ }
+ LOG.info("Shutting down JobManager");
+ jm.shutdown();
+
+ // Un-register with ResourceManager
+ rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
+
+
+ }
+ public static void main(String[] args) throws Exception {
+ final String yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME);
+ LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
+ + " user to execute Flink ApplicationMaster/JobManager to '"+yarnClientUsername+"'");
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+ for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
+ ugi.addToken(toks);
+ }
+ ugi.doAs(new PrivilegedAction<Object>() {
+ @Override
+ public Object run() {
+ try {
+ new ApplicationMaster().run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
new file mode 100644
index 0000000..6d4c7b5
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
@@ -0,0 +1,633 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.flink.yarn;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.JarFile;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+/**
+ * All classes in this package contain code taken from
+ * https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
+ * and
+ * https://github.com/hortonworks/simple-yarn-app
+ * and
+ * https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
+ *
+ * The Flink jar is uploaded to HDFS by this client.
+ * The application master and all the TaskManager containers get the jar file downloaded
+ * by YARN into their local fs.
+ *
+ */
+public class Client {
+ private static final Log LOG = LogFactory.getLog(Client.class);
+
+ /**
+ * Command Line argument options
+ */
+ private static final Option QUERY = new Option("q","query",false, "Display available YARN resources (memory, cores)");
+ // --- or ---
+ private static final Option VERBOSE = new Option("v","verbose",false, "Verbose debug mode");
+ private static final Option GEN_CONF = new Option("g","generateConf",false, "Place default configuration file in current directory");
+ private static final Option QUEUE = new Option("qu","queue",true, "Specify YARN queue.");
+ private static final Option SHIP_PATH = new Option("s","ship",true, "Ship files in the specified directory");
+ private static final Option FLINK_CONF_DIR = new Option("c","confDir",true, "Path to Flink configuration directory");
+ private static final Option FLINK_JAR = new Option("j","jar",true, "Path to Flink jar file");
+ private static final Option JM_MEMORY = new Option("jm","jobManagerMemory",true, "Memory for JobManager Container [in MB]");
+ private static final Option TM_MEMORY = new Option("tm","taskManagerMemory",true, "Memory per TaskManager Container [in MB]");
+ private static final Option TM_CORES = new Option("tmc","taskManagerCores",true, "Virtual CPU cores per TaskManager");
+ private static final Option CONTAINER = new Option("n","container",true, "Number of Yarn container to allocate (=Number of"
+ + " TaskTrackers)");
+
+ /**
+ * Constants
+ */
+ // environment variable names
+ public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+ public final static String ENV_TM_CORES = "_CLIENT_TM_CORES";
+ public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+ public final static String ENV_APP_ID = "_APP_ID";
+ public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
+ public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
+ public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+ public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+
+ private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
+
+
+
+ private Configuration conf;
+
+ public void run(String[] args) throws Exception {
+
+ if(UserGroupInformation.isSecurityEnabled()) {
+ throw new RuntimeException("Flink YARN client does not have security support right now."
+ + "File a bug, we will fix it asap");
+ }
+ //Utils.logFilesInCurrentDirectory(LOG);
+ //
+ // Command Line Options
+ //
+ Options options = new Options();
+ options.addOption(VERBOSE);
+ options.addOption(FLINK_CONF_DIR);
+ options.addOption(FLINK_JAR);
+ options.addOption(JM_MEMORY);
+ options.addOption(TM_MEMORY);
+ options.addOption(TM_CORES);
+ options.addOption(CONTAINER);
+ options.addOption(GEN_CONF);
+ options.addOption(QUEUE);
+ options.addOption(QUERY);
+ options.addOption(SHIP_PATH);
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = null;
+ try {
+ cmd = parser.parse( options, args);
+ } catch(MissingOptionException moe) {
+ System.out.println(moe.getMessage());
+ printUsage();
+ System.exit(1);
+ }
+
+ if (System.getProperty("log4j.configuration") == null) {
+ Logger root = Logger.getRootLogger();
+ root.removeAllAppenders();
+ PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
+ ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
+ root.addAppender(appender);
+ if(cmd.hasOption(VERBOSE.getOpt())) {
+ root.setLevel(Level.DEBUG);
+ LOG.debug("CLASSPATH: "+System.getProperty("java.class.path"));
+ } else {
+ root.setLevel(Level.INFO);
+ }
+ }
+
+
+ // Jar Path
+ Path localJarPath;
+ if(cmd.hasOption(FLINK_JAR.getOpt())) {
+ String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
+ if(!userPath.startsWith("file://")) {
+ userPath = "file://" + userPath;
+ }
+ localJarPath = new Path(userPath);
+ } else {
+ localJarPath = new Path("file://"+Client.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+ }
+
+ if(cmd.hasOption(GEN_CONF.getOpt())) {
+ LOG.info("Placing default configuration in current directory");
+ File outFile = generateDefaultConf(localJarPath);
+ LOG.info("File written to "+outFile.getAbsolutePath());
+ System.exit(0);
+ }
+
+ // Conf Path
+ Path confPath = null;
+ String confDirPath = "";
+ if(cmd.hasOption(FLINK_CONF_DIR.getOpt())) {
+ confDirPath = cmd.getOptionValue(FLINK_CONF_DIR.getOpt())+"/";
+ File confFile = new File(confDirPath+CONFIG_FILE_NAME);
+ if(!confFile.exists()) {
+ LOG.fatal("Unable to locate configuration file in "+confFile);
+ System.exit(1);
+ }
+ confPath = new Path(confFile.getAbsolutePath());
+ } else {
+ System.out.println("No configuration file has been specified");
+
+ // no configuration path given.
+ // -> see if there is one in the current directory
+ File currDir = new File(".");
+ File[] candidates = currDir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(final File dir, final String name) {
+ return name != null && name.endsWith(".yaml");
+ }
+ });
+ if(candidates == null || candidates.length == 0) {
+ System.out.println("No configuration file has been found in current directory.\n"
+ + "Copying default.");
+ File outFile = generateDefaultConf(localJarPath);
+ confPath = new Path(outFile.toURI());
+ } else {
+ if(candidates.length > 1) {
+ System.out.println("Multiple .yaml configuration files were found in the current directory\n"
+ + "Please specify one explicitly");
+ System.exit(1);
+ } else if(candidates.length == 1) {
+ confPath = new Path(candidates[0].toURI());
+ }
+ }
+ }
+ List<File> shipFiles = new ArrayList<File>();
+ // path to directory to ship
+ if(cmd.hasOption(SHIP_PATH.getOpt())) {
+ String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
+ File shipDir = new File(shipPath);
+ if(shipDir.isDirectory()) {
+ shipFiles = new ArrayList<File>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return !(name.equals(".") || name.equals("..") );
+ }
+ })));
+ } else {
+ LOG.warn("Ship directory is not a directory!");
+ }
+ }
+ boolean hasLog4j = false;
+ //check if there is a log4j file
+ if(confDirPath.length() > 0) {
+ File l4j = new File(confDirPath+"/log4j.properties");
+ if(l4j.exists()) {
+ shipFiles.add(l4j);
+ hasLog4j = true;
+ }
+ }
+
+ // queue
+ String queue = "default";
+ if(cmd.hasOption(QUEUE.getOpt())) {
+ queue = cmd.getOptionValue(QUEUE.getOpt());
+ }
+
+ // JobManager Memory
+ int jmMemory = 512;
+ if(cmd.hasOption(JM_MEMORY.getOpt())) {
+ jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
+ }
+
+ // Task Managers memory
+ int tmMemory = 1024;
+ if(cmd.hasOption(TM_MEMORY.getOpt())) {
+ tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
+ }
+
+ // Task Managers vcores
+ int tmCores = 1;
+ if(cmd.hasOption(TM_CORES.getOpt())) {
+ tmCores = Integer.valueOf(cmd.getOptionValue(TM_CORES.getOpt()));
+ }
+ Utils.getFlinkConfiguration(confPath.toUri().getPath());
+ int jmPort = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0);
+ if(jmPort == 0) {
+ LOG.warn("Unable to find job manager port in configuration!");
+ jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
+ }
+ conf = Utils.initializeYarnConfiguration();
+
+ // intialize HDFS
+ LOG.info("Copy App Master jar from local filesystem and add to local environment");
+ // Copy the application master jar to the filesystem
+ // Create a local resource to point to the destination jar path
+ final FileSystem fs = FileSystem.get(conf);
+
+ if(fs.getScheme().startsWith("file")) {
+ LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+ + "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
+ + "The Flink YARN client needs to store its files in a distributed file system");
+ }
+
+ // Create yarnClient
+ final YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+
+ // Query cluster for metrics
+ if(cmd.hasOption(QUERY.getOpt())) {
+ showClusterMetrics(yarnClient);
+ }
+ if(!cmd.hasOption(CONTAINER.getOpt())) {
+ LOG.fatal("Missing required argument "+CONTAINER.getOpt());
+ printUsage();
+ yarnClient.stop();
+ System.exit(1);
+ }
+
+ // TM Count
+ final int taskManagerCount = Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt()));
+
+ System.out.println("Using values:");
+ System.out.println("\tContainer Count = "+taskManagerCount);
+ System.out.println("\tJar Path = "+localJarPath.toUri().getPath());
+ System.out.println("\tConfiguration file = "+confPath.toUri().getPath());
+ System.out.println("\tJobManager memory = "+jmMemory);
+ System.out.println("\tTaskManager memory = "+tmMemory);
+ System.out.println("\tTaskManager cores = "+tmCores);
+
+ // Create application via yarnClient
+ YarnClientApplication app = yarnClient.createApplication();
+ GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
+ Resource maxRes = appResponse.getMaximumResourceCapability();
+ if(tmMemory > maxRes.getMemory() || tmCores > maxRes.getVirtualCores()) {
+ LOG.fatal("The cluster does not have the requested resources for the TaskManagers available!\n"
+ + "Maximum Memory: "+maxRes.getMemory() +", Maximum Cores: "+tmCores);
+ yarnClient.stop();
+ System.exit(1);
+ }
+ if(jmMemory > maxRes.getMemory() ) {
+ LOG.fatal("The cluster does not have the requested resources for the JobManager available!\n"
+ + "Maximum Memory: "+maxRes.getMemory());
+ yarnClient.stop();
+ System.exit(1);
+ }
+ int totalMemoryRequired = jmMemory + tmMemory * taskManagerCount;
+ ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+ if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+ LOG.fatal("This YARN session requires "+totalMemoryRequired+"MB of memory in the cluster. "
+ + "There are currently only "+freeClusterMem.totalFreeMemory+"MB available.");
+ yarnClient.stop();
+ System.exit(1);
+ }
+ if( tmMemory > freeClusterMem.containerLimit) {
+ LOG.fatal("The requested amount of memory for the TaskManagers ("+tmMemory+"MB) is more than "
+ + "the largest possible YARN container: "+freeClusterMem.containerLimit);
+ yarnClient.stop();
+ System.exit(1);
+ }
+ if( jmMemory > freeClusterMem.containerLimit) {
+ LOG.fatal("The requested amount of memory for the JobManager ("+jmMemory+"MB) is more than "
+ + "the largest possible YARN container: "+freeClusterMem.containerLimit);
+ yarnClient.stop();
+ System.exit(1);
+ }
+
+ // respect custom JVM options in the YAML file
+ final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records
+ .newRecord(ContainerLaunchContext.class);
+
+ String amCommand = "$JAVA_HOME/bin/java"
+ + " -Xmx"+Utils.calculateHeapSize(jmMemory)+"M " +javaOpts;
+ if(hasLog4j) {
+ amCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
+ }
+ amCommand += " org.apache.flink.yarn.ApplicationMaster" + " "
+ + " 1>"
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
+ + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
+ amContainer.setCommands(Collections.singletonList(amCommand));
+
+ System.err.println("amCommand="+amCommand);
+
+ // Set-up ApplicationSubmissionContext for the application
+ ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+ final ApplicationId appId = appContext.getApplicationId();
+
+ // Setup jar for ApplicationMaster
+ LocalResource appMasterJar = Records.newRecord(LocalResource.class);
+ LocalResource flinkConf = Records.newRecord(LocalResource.class);
+ Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), localJarPath, appMasterJar, fs.getHomeDirectory());
+ Path remotePathConf = Utils.setupLocalResource(conf, fs, appId.toString(), confPath, flinkConf, fs.getHomeDirectory());
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
+ localResources.put("flink.jar", appMasterJar);
+ localResources.put("flink-conf.yaml", flinkConf);
+
+
+ // setup security tokens (code from apache storm)
+ final Path[] paths = new Path[3 + shipFiles.size()];
+ StringBuffer envShipFileList = new StringBuffer();
+ // upload ship files
+ for (int i = 0; i < shipFiles.size(); i++) {
+ File shipFile = shipFiles.get(i);
+ LocalResource shipResources = Records.newRecord(LocalResource.class);
+ Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
+ paths[3 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
+ shipLocalPath, shipResources, fs.getHomeDirectory());
+ localResources.put(shipFile.getName(), shipResources);
+
+ envShipFileList.append(paths[3 + i]);
+ if(i+1 < shipFiles.size()) {
+ envShipFileList.append(',');
+ }
+ }
+
+ paths[0] = remotePathJar;
+ paths[1] = remotePathConf;
+ paths[2] = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
+ FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+ fs.setPermission(paths[2], permission); // set permission for path.
+ Utils.setTokensFor(amContainer, paths, this.conf);
+
+
+ amContainer.setLocalResources(localResources);
+ fs.close();
+
+ // Setup CLASSPATH for ApplicationMaster
+ Map<String, String> appMasterEnv = new HashMap<String, String>();
+ Utils.setupEnv(conf, appMasterEnv);
+ // set configuration values
+ appMasterEnv.put(Client.ENV_TM_COUNT, String.valueOf(taskManagerCount));
+ appMasterEnv.put(Client.ENV_TM_CORES, String.valueOf(tmCores));
+ appMasterEnv.put(Client.ENV_TM_MEMORY, String.valueOf(tmMemory));
+ appMasterEnv.put(Client.FLINK_JAR_PATH, remotePathJar.toString() );
+ appMasterEnv.put(Client.ENV_APP_ID, appId.toString());
+ appMasterEnv.put(Client.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
+ appMasterEnv.put(Client.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
+ appMasterEnv.put(Client.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
+
+ amContainer.setEnvironment(appMasterEnv);
+
+ // Set up resource type requirements for ApplicationMaster
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(jmMemory);
+ capability.setVirtualCores(1);
+
+ appContext.setApplicationName("Flink"); // application name
+ appContext.setAMContainerSpec(amContainer);
+ appContext.setResource(capability);
+ appContext.setQueue(queue);
+
+ // file that we write into the conf/ dir containing the jobManager address.
+ final File addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE);
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Killing the Flink-YARN application.");
+ yarnClient.killApplication(appId);
+ LOG.info("Deleting files in "+paths[2]);
+ FileSystem shutFS = FileSystem.get(conf);
+ shutFS.delete(paths[2], true); // delete conf and jar file.
+ shutFS.close();
+ } catch (Exception e) {
+ LOG.warn("Exception while killing the YARN application", e);
+ }
+ try {
+ addrFile.delete();
+ } catch (Exception e) {
+ LOG.warn("Exception while deleting the jobmanager address file", e);
+ }
+ LOG.info("YARN Client is shutting down");
+ yarnClient.stop();
+ }
+ });
+
+ LOG.info("Submitting application master " + appId);
+ yarnClient.submitApplication(appContext);
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ boolean told = false;
+ char[] el = { '/', '|', '\\', '-'};
+ int i = 0;
+ while (appState != YarnApplicationState.FINISHED
+ && appState != YarnApplicationState.KILLED
+ && appState != YarnApplicationState.FAILED) {
+ if(!told && appState == YarnApplicationState.RUNNING) {
+ System.err.println("Flink JobManager is now running on "+appReport.getHost()+":"+jmPort);
+ System.err.println("JobManager Web Interface: "+appReport.getTrackingUrl());
+ // write jobmanager connect information
+
+ PrintWriter out = new PrintWriter(addrFile);
+ out.println(appReport.getHost()+":"+jmPort);
+ out.close();
+ addrFile.setReadable(true, false); // readable for all.
+ told = true;
+ }
+ if(!told) {
+ System.err.print(el[i++]+"\r");
+ if(i == el.length) {
+ i = 0;
+ }
+ Thread.sleep(500); // wait for the application to switch to RUNNING
+ } else {
+ Thread.sleep(5000);
+ }
+
+ appReport = yarnClient.getApplicationReport(appId);
+ appState = appReport.getYarnApplicationState();
+ }
+
+ LOG.info("Application " + appId + " finished with"
+ + " state " + appState + " at " + appReport.getFinishTime());
+ if(appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED ) {
+ LOG.warn("Application failed. Diagnostics "+appReport.getDiagnostics());
+ }
+
+ }
+ private static class ClusterResourceDescription {
+ public int totalFreeMemory;
+ public int containerLimit;
+ }
+ private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
+ ClusterResourceDescription crd = new ClusterResourceDescription();
+ crd.totalFreeMemory = 0;
+ crd.containerLimit = 0;
+ List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+ for(NodeReport rep : nodes) {
+ int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
+ crd.totalFreeMemory += free;
+ if(free > crd.containerLimit) {
+ crd.containerLimit = free;
+ }
+ }
+ return crd;
+ }
+
+ private void printUsage() {
+ System.out.println("Usage:");
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.setWidth(200);
+ formatter.setLeftPadding(5);
+ formatter.setSyntaxPrefix(" Required");
+ Options req = new Options();
+ req.addOption(CONTAINER);
+ formatter.printHelp(" ", req);
+
+ formatter.setSyntaxPrefix(" Optional");
+ Options opt = new Options();
+ opt.addOption(VERBOSE);
+ // opt.addOption(GEN_CONF);
+ // opt.addOption(STRATOSPHERE_CONF);
+ // opt.addOption(STRATOSPHERE_JAR);
+ opt.addOption(JM_MEMORY);
+ opt.addOption(TM_MEMORY);
+ opt.addOption(TM_CORES);
+ opt.addOption(QUERY);
+ opt.addOption(QUEUE);
+ formatter.printHelp(" ", opt);
+ }
+
+ private void showClusterMetrics(YarnClient yarnClient)
+ throws YarnException, IOException {
+ YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
+ System.out.println("NodeManagers in the Cluster " + metrics.getNumNodeManagers());
+ List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+ final String format = "|%-16s |%-16s %n";
+ System.out.printf("|Property |Value %n");
+ System.out.println("+---------------------------------------+");
+ int totalMemory = 0;
+ int totalCores = 0;
+ for(NodeReport rep : nodes) {
+ final Resource res = rep.getCapability();
+ totalMemory += res.getMemory();
+ totalCores += res.getVirtualCores();
+ System.out.format(format, "NodeID", rep.getNodeId());
+ System.out.format(format, "Memory", res.getMemory()+" MB");
+ System.out.format(format, "vCores", res.getVirtualCores());
+ System.out.format(format, "HealthReport", rep.getHealthReport());
+ System.out.format(format, "Containers", rep.getNumContainers());
+ System.out.println("+---------------------------------------+");
+ }
+ System.out.println("Summary: totalMemory "+totalMemory+" totalCores "+totalCores);
+ List<QueueInfo> qInfo = yarnClient.getAllQueues();
+ for(QueueInfo q : qInfo) {
+ System.out.println("Queue: "+q.getQueueName()+", Current Capacity: "+q.getCurrentCapacity()+" Max Capacity: "+q.getMaximumCapacity()+" Applications: "+q.getApplications().size());
+ }
+ yarnClient.stop();
+ System.exit(0);
+ }
+
+ private File generateDefaultConf(Path localJarPath) throws IOException,
+ FileNotFoundException {
+ JarFile jar = null;
+ try {
+ jar = new JarFile(localJarPath.toUri().getPath());
+ } catch(FileNotFoundException fne) {
+ LOG.fatal("Unable to access jar file. Specify jar file or configuration file.", fne);
+ System.exit(1);
+ }
+ InputStream confStream = jar.getInputStream(jar.getEntry("flink-conf.yaml"));
+
+ if(confStream == null) {
+ LOG.warn("Given jar file does not contain yaml conf.");
+ confStream = this.getClass().getResourceAsStream("flink-conf.yaml");
+ if(confStream == null) {
+ throw new RuntimeException("Unable to find flink-conf in jar file");
+ }
+ }
+ File outFile = new File("flink-conf.yaml");
+ if(outFile.exists()) {
+ throw new RuntimeException("File unexpectedly exists");
+ }
+ FileOutputStream outputStream = new FileOutputStream(outFile);
+ int read = 0;
+ byte[] bytes = new byte[1024];
+ while ((read = confStream.read(bytes)) != -1) {
+ outputStream.write(bytes, 0, read);
+ }
+ confStream.close(); outputStream.close(); jar.close();
+ return outFile;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Client c = new Client();
+ c.run(args);
+ }
+}