You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/04 20:44:58 UTC

[1/2] flink git commit: [FLINK-1648] Add auto-parallelism to select all available task slots

Repository: flink
Updated Branches:
  refs/heads/master 9c0d6347e -> d8d642fd6


[FLINK-1648] Add auto-parallelism to select all available task slots


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8d642fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8d642fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8d642fd

Branch: refs/heads/master
Commit: d8d642fd6d7d9b8526325d4efff1015f636c5ddb
Parents: 55f1508
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 16 21:40:06 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 4 20:44:13 2015 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       |   6 +
 .../base/CollectorMapOperatorBase.java          |   1 +
 .../flink/runtime/jobmanager/JobManager.scala   |   9 ++
 .../flink/test/misc/AutoParallelismITCase.java  | 143 +++++++++++++++++++
 .../test/recovery/SimpleRecoveryITCase.java     |   2 +-
 5 files changed, 160 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 81ce471..d315440 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -35,6 +35,12 @@ public class ExecutionConfig implements Serializable {
 	// Key for storing it in the Job Configuration
 	public static final String CONFIG_KEY = "runtime.config";
 
+	/**
+	 * The constant to use for the degree of parallelism, if the system should use the number
+	 *  of currently available slots.
+	 */
+	public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;
+
 	private boolean useClosureCleaner = true;
 	private int degreeOfParallelism = -1;
 	private int numberOfExecutionRetries = -1;

http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
index 62bdfbe..b7ff2ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper;
  * @see GenericCollectorMap
  */
 @Deprecated
+@SuppressWarnings("deprecation")
 public class CollectorMapOperatorBase<IN, OUT, FT extends GenericCollectorMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
 	
 	public CollectorMapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 54d3cf2..e3e96e5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -22,6 +22,7 @@ import java.io.{IOException, File}
 import java.net.InetSocketAddress
 
 import akka.actor.Status.{Success, Failure}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
@@ -477,12 +478,20 @@ class JobManager(val configuration: Configuration,
           log.debug(s"Running initialization on master for job ${jobId} (${jobName}).")
         }
 
+        val numSlots = scheduler.getTotalNumberOfSlots()
+
         for (vertex <- jobGraph.getVertices.asScala) {
+
           val executableClass = vertex.getInvokableClassName
           if (executableClass == null || executableClass.length == 0) {
             throw new JobSubmissionException(jobId,
               s"The vertex ${vertex.getID} (${vertex.getName}) has no invokable class.")
           }
+
+          if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+            vertex.setParallelism(numSlots)
+          }
+
           try {
             vertex.initializeOnMaster(userCodeLoader)
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
new file mode 100644
index 0000000..ea79a3a
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test verifies that the auto parallelism is properly forwarded to the runtime.
+ */
+@SuppressWarnings("serial")
+public class AutoParallelismITCase {
+
+	private static final int NUM_TM = 2;
+	private static final int SLOTS_PER_TM = 7;
+	private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
+
+	private static ForkableFlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void setupCluster() {
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TM);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM);
+		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+
+		cluster = new ForkableFlinkMiniCluster(config, false);
+	}
+
+	@AfterClass
+	public static void teardownCluster() {
+		try {
+			cluster.stop();
+		}
+		catch (Throwable t) {
+			System.err.println("Error stopping cluster on shutdown");
+			t.printStackTrace();
+			fail("Cluster shutdown caused an exception: " + t.getMessage());
+		}
+	}
+
+
+	@Test
+	public void testProgramWithAutoParallelism() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getJobManagerRPCPort());
+
+			env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+
+			DataSet<Integer> result = env
+					.createInput(new ParallelismDependentInputFormat())
+					.rebalance()
+					.mapPartition(new ParallelismDependentMapPartition());
+
+			List<Integer> resultCollection = new ArrayList<Integer>();
+			result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
+
+			env.execute();
+
+			assertEquals(PARALLELISM, resultCollection.size());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			try {
+				cluster.stop();
+			}
+			catch (Throwable t) {
+				// ignore exceptions on shutdown
+			}
+		}
+	}
+
+	private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
+
+		private transient boolean emitted;
+
+		@Override
+		public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
+			assertEquals(PARALLELISM, numSplits);
+			return super.createInputSplits(numSplits);
+		}
+
+		@Override
+		public boolean reachedEnd() {
+			return emitted;
+		}
+
+		@Override
+		public Integer nextRecord(Integer reuse) {
+			if (emitted) {
+				return null;
+			}
+			emitted = true;
+			return 1;
+		}
+	}
+
+	private static class ParallelismDependentMapPartition extends RichMapPartitionFunction<Integer, Integer> {
+
+		@Override
+		public void mapPartition(Iterable<Integer> values, Collector<Integer> out) {
+			out.collect(getRuntimeContext().getIndexOfThisSubtask());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index 1591d67..8330109 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -37,9 +37,9 @@ import java.util.List;
 
 import static org.junit.Assert.*;
 
+@SuppressWarnings("serial")
 public class SimpleRecoveryITCase {
 
-
 	private static ForkableFlinkMiniCluster cluster;
 
 	@BeforeClass


[2/2] flink git commit: [tests] Simple code format in CountCollectITCase and suppresses sysout output for test.

Posted by se...@apache.org.
[tests] Simple code format in CountCollectITCase and suppresses sysout output for test.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55f1508a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55f1508a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55f1508a

Branch: refs/heads/master
Commit: 55f1508a553974f5d1af88e9fab10c3c915e27e5
Parents: 9c0d634
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 4 15:18:37 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 4 20:44:13 2015 +0100

----------------------------------------------------------------------
 .../flink/test/actions/CountCollectITCase.java  | 124 ++++++++++---------
 1 file changed, 68 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/55f1508a/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
index 0369a56..c29b4f3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/actions/CountCollectITCase.java
@@ -22,69 +22,81 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 
 import static org.junit.Assert.*;
 
+import org.junit.BeforeClass;
 import org.junit.Test;
 
-
+/**
+ * Tests the methods that bring elements back to the client driver program.
+ */
 public class CountCollectITCase {
 
-    @Test
-    public void testCountCollectOnSimpleJob() throws Exception {
-        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        env.setDegreeOfParallelism(5);
-        
-        Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
-        
-        DataSet<Integer> data = env.fromElements(input);
-
-        // count
-        long numEntries = data.count();
-        assertEquals(10, numEntries);
-
-        // collect
-        ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
-        assertArrayEquals(input, list.toArray());
-
-    }
-    
-    @Test
-    public void testCountCollectOnAdvancedJob() throws Exception {
-        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-        env.setDegreeOfParallelism(5);
-        env.getConfig().disableObjectReuse();
-
-
-        DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-        DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-
-        DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
-
-        // count
-        long numEntries = data3.count();
-        assertEquals(100, numEntries);
-
-        // collect
-        ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, Integer>>) data3.collect();
-        System.out.println(list);
-
-        // set expected entries in a hash map to true
-        HashMap<Tuple2<Integer, Integer>, Boolean> expected = new HashMap<Tuple2<Integer, Integer>, Boolean>();
-        for (int i = 1; i <= 10; i++) {
-            for (int j = 1; j <= 10; j++) {
-                expected.put(new Tuple2<Integer, Integer>(i, j), true);
-            }
-        }
-
-        // check if all entries are contained in the hash map
-        for (int i = 0; i < 100; i++) {
-            Tuple2<Integer, Integer> element = list.get(i);
-            assertEquals(expected.get(element), true);
-            expected.remove(element);
-        }
-
-    }
+	@BeforeClass
+	public static void suppressStandardOut() {
+		java.io.OutputStream blackhole = new java.io.OutputStream() {
+			@Override
+			public void write(int b){}
+		};
+
+		System.setOut(new PrintStream(blackhole));
+		System.setErr(new PrintStream(blackhole));
+	}
+
+	@Test
+	public void testSimple() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(5);
+
+		Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+
+		DataSet<Integer> data = env.fromElements(input);
+
+		// count
+		long numEntries = data.count();
+		assertEquals(10, numEntries);
+
+		// collect
+		ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
+		assertArrayEquals(input, list.toArray());
+	}
+
+	@Test
+	public void testAdvanced() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(5);
+		env.getConfig().disableObjectReuse();
+
+
+		DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+		DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+		DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
+
+		// count
+		long numEntries = data3.count();
+		assertEquals(100, numEntries);
+
+		// collect
+		ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer, Integer>>) data3.collect();
+
+		// set expected entries in a hash map to true
+		HashMap<Tuple2<Integer, Integer>, Boolean> expected = new HashMap<Tuple2<Integer, Integer>, Boolean>();
+		for (int i = 1; i <= 10; i++) {
+			for (int j = 1; j <= 10; j++) {
+				expected.put(new Tuple2<Integer, Integer>(i, j), true);
+			}
+		}
+
+		// check if all entries are contained in the hash map
+		for (int i = 0; i < 100; i++) {
+			Tuple2<Integer, Integer> element = list.get(i);
+			assertEquals(expected.get(element), true);
+			expected.remove(element);
+		}
+	}
 }