You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/04 20:44:58 UTC
[1/2] flink git commit: [FLINK-1648] Add auto-parallelism to select
all available task slots
Repository: flink
Updated Branches:
refs/heads/master 9c0d6347e -> d8d642fd6
[FLINK-1648] Add auto-parallelism to select all available task slots
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8d642fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8d642fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8d642fd
Branch: refs/heads/master
Commit: d8d642fd6d7d9b8526325d4efff1015f636c5ddb
Parents: 55f1508
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 16 21:40:06 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 4 20:44:13 2015 +0100
----------------------------------------------------------------------
.../flink/api/common/ExecutionConfig.java | 6 +
.../base/CollectorMapOperatorBase.java | 1 +
.../flink/runtime/jobmanager/JobManager.scala | 9 ++
.../flink/test/misc/AutoParallelismITCase.java | 143 +++++++++++++++++++
.../test/recovery/SimpleRecoveryITCase.java | 2 +-
5 files changed, 160 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 81ce471..d315440 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -35,6 +35,12 @@ public class ExecutionConfig implements Serializable {
// Key for storing it in the Job Configuration
public static final String CONFIG_KEY = "runtime.config";
+ /**
+ * The constant to use for the degree of parallelism, if the system should use the number
+ * of currently available slots.
+ */
+ public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;
+
private boolean useClosureCleaner = true;
private int degreeOfParallelism = -1;
private int numberOfExecutionRetries = -1;
http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
index 62bdfbe..b7ff2ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper;
* @see GenericCollectorMap
*/
@Deprecated
+@SuppressWarnings("deprecation")
public class CollectorMapOperatorBase<IN, OUT, FT extends GenericCollectorMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
public CollectorMapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 54d3cf2..e3e96e5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -22,6 +22,7 @@ import java.io.{IOException, File}
import java.net.InetSocketAddress
import akka.actor.Status.{Success, Failure}
+import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
import org.apache.flink.core.io.InputSplitAssigner
import org.apache.flink.runtime.blob.BlobServer
@@ -477,12 +478,20 @@ class JobManager(val configuration: Configuration,
log.debug(s"Running initialization on master for job ${jobId} (${jobName}).")
}
+ val numSlots = scheduler.getTotalNumberOfSlots()
+
for (vertex <- jobGraph.getVertices.asScala) {
+
val executableClass = vertex.getInvokableClassName
if (executableClass == null || executableClass.length == 0) {
throw new JobSubmissionException(jobId,
s"The vertex ${vertex.getID} (${vertex.getName}) has no invokable class.")
}
+
+ if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+ vertex.setParallelism(numSlots)
+ }
+
try {
vertex.initializeOnMaster(userCodeLoader)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
new file mode 100644
index 0000000..ea79a3a
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test verifies that the auto parallelism is properly forwarded to the runtime.
+ */
+@SuppressWarnings("serial")
+public class AutoParallelismITCase {
+
+ private static final int NUM_TM = 2;
+ private static final int SLOTS_PER_TM = 7;
+ private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
+
+ private static ForkableFlinkMiniCluster cluster;
+
+ @BeforeClass
+ public static void setupCluster() {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TM);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM);
+ config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+
+ cluster = new ForkableFlinkMiniCluster(config, false);
+ }
+
+ @AfterClass
+ public static void teardownCluster() {
+ try {
+ cluster.stop();
+ }
+ catch (Throwable t) {
+ System.err.println("Error stopping cluster on shutdown");
+ t.printStackTrace();
+ fail("Cluster shutdown caused an exception: " + t.getMessage());
+ }
+ }
+
+
+ @Test
+ public void testProgramWithAutoParallelism() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getJobManagerRPCPort());
+
+ env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+
+ DataSet<Integer> result = env
+ .createInput(new ParallelismDependentInputFormat())
+ .rebalance()
+ .mapPartition(new ParallelismDependentMapPartition());
+
+ List<Integer> resultCollection = new ArrayList<Integer>();
+ result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
+
+ env.execute();
+
+ assertEquals(PARALLELISM, resultCollection.size());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ try {
+ cluster.stop();
+ }
+ catch (Throwable t) {
+ // ignore exceptions on shutdown
+ }
+ }
+ }
+
+ private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
+
+ private transient boolean emitted;
+
+ @Override
+ public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
+ assertEquals(PARALLELISM, numSplits);
+ return super.createInputSplits(numSplits);
+ }
+
+ @Override
+ public boolean reachedEnd() {
+ return emitted;
+ }
+
+ @Override
+ public Integer nextRecord(Integer reuse) {
+ if (emitted) {
+ return null;
+ }
+ emitted = true;
+ return 1;
+ }
+ }
+
+ private static class ParallelismDependentMapPartition extends RichMapPartitionFunction<Integer, Integer> {
+
+ @Override
+ public void mapPartition(Iterable<Integer> values, Collector<Integer> out) {
+ out.collect(getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index 1591d67..8330109 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -37,9 +37,9 @@ import java.util.List;
import static org.junit.Assert.*;
+@SuppressWarnings("serial")
public class SimpleRecoveryITCase {
-
private static ForkableFlinkMiniCluster cluster;
@BeforeClass
[2/2] flink git commit: [tests] Simple code format in
CountCollectITCase and suppresses sysout output for test.
Posted by se...@apache.org.
[tests] Simple code format in CountCollectITCase and suppresses sysout output for test.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55f1508a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55f1508a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55f1508a
Branch: refs/heads/master
Commit: 55f1508a553974f5d1af88e9fab10c3c915e27e5
Parents: 9c0d634
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 4 15:18:37 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 4 20:44:13 2015 +0100
----------------------------------------------------------------------
.../flink/test/actions/CountCollectITCase.java | 124 ++++++++++---------
1 file changed, 68 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/55f1508a/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
index 0369a56..c29b4f3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
@@ -22,69 +22,81 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import static org.junit.Assert.*;
+import org.junit.BeforeClass;
import org.junit.Test;
-
+/**
+ * Tests the methods that bring elements back to the client driver program.
+ */
public class CountCollectITCase {
- @Test
- public void testCountCollectOnSimpleJob() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(5);
-
- Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
-
- DataSet<Integer> data = env.fromElements(input);
-
- // count
- long numEntries = data.count();
- assertEquals(10, numEntries);
-
- // collect
- ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
- assertArrayEquals(input, list.toArray());
-
- }
-
- @Test
- public void testCountCollectOnAdvancedJob() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(5);
- env.getConfig().disableObjectReuse();
-
-
- DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
- DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-
- DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
-
- // count
- long numEntries = data3.count();
- assertEquals(100, numEntries);
-
- // collect
- ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, Integer>>) data3.collect();
- System.out.println(list);
-
- // set expected entries in a hash map to true
- HashMap<Tuple2<Integer, Integer>, Boolean> expected = new HashMap<Tuple2<Integer, Integer>, Boolean>();
- for (int i = 1; i <= 10; i++) {
- for (int j = 1; j <= 10; j++) {
- expected.put(new Tuple2<Integer, Integer>(i, j), true);
- }
- }
-
- // check if all entries are contained in the hash map
- for (int i = 0; i < 100; i++) {
- Tuple2<Integer, Integer> element = list.get(i);
- assertEquals(expected.get(element), true);
- expected.remove(element);
- }
-
- }
+ @BeforeClass
+ public static void suppressStandardOut() {
+ java.io.OutputStream blackhole = new java.io.OutputStream() {
+ @Override
+ public void write(int b){}
+ };
+
+ System.setOut(new PrintStream(blackhole));
+ System.setErr(new PrintStream(blackhole));
+ }
+
+ @Test
+ public void testSimple() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(5);
+
+ Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+
+ DataSet<Integer> data = env.fromElements(input);
+
+ // count
+ long numEntries = data.count();
+ assertEquals(10, numEntries);
+
+ // collect
+ ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
+ assertArrayEquals(input, list.toArray());
+ }
+
+ @Test
+ public void testAdvanced() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(5);
+ env.getConfig().disableObjectReuse();
+
+
+ DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+ DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
+
+ // count
+ long numEntries = data3.count();
+ assertEquals(100, numEntries);
+
+ // collect
+ ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, Integer>>) data3.collect();
+
+ // set expected entries in a hash map to true
+ HashMap<Tuple2<Integer, Integer>, Boolean> expected = new HashMap<Tuple2<Integer, Integer>, Boolean>();
+ for (int i = 1; i <= 10; i++) {
+ for (int j = 1; j <= 10; j++) {
+ expected.put(new Tuple2<Integer, Integer>(i, j), true);
+ }
+ }
+
+ // check if all entries are contained in the hash map
+ for (int i = 0; i < 100; i++) {
+ Tuple2<Integer, Integer> element = list.get(i);
+ assertEquals(expected.get(element), true);
+ expected.remove(element);
+ }
+ }
}