You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/20 20:52:53 UTC

[2/2] incubator-flink git commit: [FLINK-1264] [compiler] Properly forward custom partitioners to the runtime

[FLINK-1264] [compiler] Properly forward custom partitioners to the runtime


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d0f2db06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d0f2db06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d0f2db06

Branch: refs/heads/master
Commit: d0f2db06e17d1a5d20adcf4f9ca555f4885774b9
Parents: b3e5ed0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 20 17:03:50 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 20 19:14:47 2014 +0100

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java | 16 +++--
 .../test/misc/CustomPartitioningITCase.java     | 73 ++++++++++++++++++++
 2 files changed, 82 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0f2db06/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 64eca7c..eb6fe2e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -1072,19 +1072,21 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		if (channel.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) {
 			
 			final DataDistribution dataDistribution = channel.getDataDistribution();
-			if(dataDistribution != null) {
+			if (dataDistribution != null) {
 				sourceConfig.setOutputDataDistribution(dataDistribution, outputIndex);
 			} else {
 				throw new RuntimeException("Range partitioning requires data distribution");
 				// TODO: inject code and configuration for automatic histogram generation
 			}
 		}
-//		if (targetContract instanceof GenericDataSink) {
-//			final DataDistribution distri = ((GenericDataSink) targetContract).getDataDistribution();
-//			if (distri != null) {
-//				configForOutputShipStrategy.setOutputDataDistribution(distri);
-//			}
-//		}
+		
+		if (channel.getShipStrategy() == ShipStrategyType.PARTITION_CUSTOM) {
+			if (channel.getPartitioner() != null) {
+				sourceConfig.setOutputPartitioner(channel.getPartitioner(), outputIndex);
+			} else {
+				throw new CompilerException("The ship strategy was set to custom partitioning, but no partitioner was set.");
+			}
+		}
 		
 		// ---------------- configure the receiver -------------------
 		if (isBroadcast) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0f2db06/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
new file mode 100644
index 0000000..c308007
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.Assert;
+
+@SuppressWarnings("serial")
+public class CustomPartitioningITCase extends JavaProgramTestBase {
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		if (!isCollectionExecution()) {
+			Assert.assertTrue(env.getDegreeOfParallelism() > 1);
+		}
+		
+		env.generateSequence(1, 1000)
+			.partitionCustom(new AllZeroPartitioner(), new IdKeySelector<Long>())
+			.map(new FailExceptInPartitionZeroMapper())
+			.output(new DiscardingOutputFormat<Long>());
+		
+		env.execute();
+	}
+	
+	public static class FailExceptInPartitionZeroMapper extends RichMapFunction<Long, Long> {
+		
+		@Override
+		public Long map(Long value) throws Exception {
+			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+				return value;
+			} else {
+				throw new Exception("Received data in a partition other than partition 0");
+			}
+		}
+	}
+	
+	public static class AllZeroPartitioner implements Partitioner<Long> {
+		@Override
+		public int partition(Long key, int numPartitions) {
+			return 0;
+		}
+	}
+	
+	public static class IdKeySelector<T> implements KeySelector<T, T> {
+		@Override
+		public T getKey(T value) {
+			return value;
+		}
+	}
+}