You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/20 20:52:52 UTC

[1/2] incubator-flink git commit: [FLINK-820] [compiler] Support for disconnected data flows

Repository: incubator-flink
Updated Branches:
  refs/heads/master b3e5ed0ba -> 98ff76b0e


[FLINK-820] [compiler] Support for disconnected data flows


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/98ff76b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/98ff76b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/98ff76b0

Branch: refs/heads/master
Commit: 98ff76b0ea61a342250e15411edd9f7974cbe96d
Parents: d0f2db0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 20 16:43:18 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 20 19:14:47 2014 +0100

----------------------------------------------------------------------
 .../apache/flink/compiler/dag/SinkJoiner.java   | 36 +++++++-------
 .../compiler/BranchingPlansCompilerTest.java    | 47 +-----------------
 .../flink/compiler/DisjointDataFlowsTest.java   | 51 ++++++++++++++++++++
 .../test/misc/DisjointDataflowsITCase.java      | 37 ++++++++++++++
 4 files changed, 108 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98ff76b0/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
index b37c6ac..c153078 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dag;
 
 import java.util.ArrayList;
@@ -24,7 +23,6 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
-import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.operators.OperatorDescriptorDual;
 import org.apache.flink.compiler.operators.UtilSinkJoinOpDescriptor;
@@ -74,23 +72,25 @@ public class SinkJoiner extends TwoInputNode {
 		
 		// if the predecessors do not have branches, then we have multiple sinks that do not originate from
 		// a common data flow.
-		if (pred1branches == null || pred1branches.isEmpty() || pred2branches == null || pred2branches.isEmpty()) {
-			throw new CompilerException("The given program contains multiple disconnected data flows.");
+		if (pred1branches == null || pred1branches.isEmpty()) {
+			
+			this.openBranches = (pred2branches == null || pred2branches.isEmpty()) ?
+					Collections.<UnclosedBranchDescriptor>emptyList() : // both empty - disconnected flow
+					pred2branches;
+		}
+		else if (pred2branches == null || pred2branches.isEmpty()) {
+			this.openBranches = pred1branches;
+		}
+		else {
+			// copy the lists and merge
+			List<UnclosedBranchDescriptor> result1 = new ArrayList<UnclosedBranchDescriptor>(pred1branches);
+			List<UnclosedBranchDescriptor> result2 = new ArrayList<UnclosedBranchDescriptor>(pred2branches);
+			
+			ArrayList<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>();
+			mergeLists(result1, result2, result);
+			
+			this.openBranches = result.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
 		}
-		
-		// copy the lists and merge
-		List<UnclosedBranchDescriptor> result1 = new ArrayList<UnclosedBranchDescriptor>(pred1branches);
-		List<UnclosedBranchDescriptor> result2 = new ArrayList<UnclosedBranchDescriptor>(pred2branches);
-		
-		ArrayList<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>();
-		mergeLists(result1, result2, result);
-		
-//		if (!didCloseSomeBranch) {
-//			// if the sink joiners do not close branches, then we have disjoint data flows.
-//			throw new CompilerException("The given program contains multiple disconnected data flows.");
-//		}
-		
-		this.openBranches = result.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98ff76b0/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
index 196f602..2b4ff02 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
@@ -566,42 +566,6 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	/**
 	 * 
 	 * <pre>
-	 *           (SINK A)    (SINK B)
-	 *             /           /
-	 *         (SRC A)     (SRC B)
-	 * </pre>
-	 */
-	@Test
-	public void testSimpleDisjointPlan() {
-		// construct the plan
-		final String out1Path = "file:///test/1";
-		final String out2Path = "file:///test/2";
-
-		FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
-		FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE);
-		
-		FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA);
-		FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, sourceB);
-		
-		List<FileDataSink> sinks = new ArrayList<FileDataSink>();
-		sinks.add(sinkA);
-		sinks.add(sinkB);
-		
-		// return the PACT plan
-		Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks");
-		
-		try {
-			compileNoStats(plan);
-			Assert.fail("Plan must not be compilable, it contains disjoint sub-plans.");
-		}
-		catch (Exception ex) {
-			// as expected
-		}
-	}
-	
-	/**
-	 * 
-	 * <pre>
 	 *     (SINK 3) (SINK 1)   (SINK 2) (SINK 4)
 	 *         \     /             \     /
 	 *         (SRC A)             (SRC B)
@@ -609,7 +573,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 	 * 
 	 * NOTE: this case is currently not caught by the compiler. we should enable the test once it is caught.
 	 */
-//	@Test (Deactivated for now because of unsupported feature)
+	@Test
 	public void testBranchingDisjointPlan() {
 		// construct the plan
 		final String out1Path = "file:///test/1";
@@ -634,14 +598,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 		
 		// return the PACT plan
 		Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks and branches");
-		
-		try {
-			compileNoStats(plan);
-			Assert.fail("Plan must not be compilable, it contains disjoint sub-plans.");
-		}
-		catch (Exception ex) {
-			// as expected
-		}
+		compileNoStats(plan);
 	}
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98ff76b0/flink-compiler/src/test/java/org/apache/flink/compiler/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/DisjointDataFlowsTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/DisjointDataFlowsTest.java
new file mode 100644
index 0000000..0c7bbef
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DisjointDataFlowsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class DisjointDataFlowsTest extends CompilerTestBase {
+
+	@Test
+	public void testDisjointFlows() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			// generate two different flows
+			env.generateSequence(1, 10).print();
+			env.generateSequence(1, 10).print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			new NepheleJobGraphGenerator().compileJobGraph(op);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98ff76b0/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
new file mode 100644
index 0000000..f4d3d0b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class DisjointDataflowsITCase extends JavaProgramTestBase {
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// generate two different flows
+		env.generateSequence(1, 10).output(new DiscardingOuputFormat<Long>());
+		env.generateSequence(1, 10).output(new DiscardingOuputFormat<Long>());
+		
+		env.execute();
+	}
+}


[2/2] incubator-flink git commit: [FLINK-1264] [compiler] Properly forward custom partitioners to the runtime

Posted by se...@apache.org.
[FLINK-1264] [compiler] Properly forward custom partitioners to the runtime


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d0f2db06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d0f2db06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d0f2db06

Branch: refs/heads/master
Commit: d0f2db06e17d1a5d20adcf4f9ca555f4885774b9
Parents: b3e5ed0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 20 17:03:50 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 20 19:14:47 2014 +0100

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java | 16 +++--
 .../test/misc/CustomPartitioningITCase.java     | 73 ++++++++++++++++++++
 2 files changed, 82 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0f2db06/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 64eca7c..eb6fe2e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -1072,19 +1072,21 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		if (channel.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) {
 			
 			final DataDistribution dataDistribution = channel.getDataDistribution();
-			if(dataDistribution != null) {
+			if (dataDistribution != null) {
 				sourceConfig.setOutputDataDistribution(dataDistribution, outputIndex);
 			} else {
 				throw new RuntimeException("Range partitioning requires data distribution");
 				// TODO: inject code and configuration for automatic histogram generation
 			}
 		}
-//		if (targetContract instanceof GenericDataSink) {
-//			final DataDistribution distri = ((GenericDataSink) targetContract).getDataDistribution();
-//			if (distri != null) {
-//				configForOutputShipStrategy.setOutputDataDistribution(distri);
-//			}
-//		}
+		
+		if (channel.getShipStrategy() == ShipStrategyType.PARTITION_CUSTOM) {
+			if (channel.getPartitioner() != null) {
+				sourceConfig.setOutputPartitioner(channel.getPartitioner(), outputIndex);
+			} else {
+				throw new CompilerException("The ship strategy was set to custom partitioning, but no partitioner was set.");
+			}
+		}
 		
 		// ---------------- configure the receiver -------------------
 		if (isBroadcast) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0f2db06/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
new file mode 100644
index 0000000..c308007
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.Assert;
+
+@SuppressWarnings("serial")
+public class CustomPartitioningITCase extends JavaProgramTestBase {
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		if (!isCollectionExecution()) {
+			Assert.assertTrue(env.getDegreeOfParallelism() > 1);
+		}
+		
+		env.generateSequence(1, 1000)
+			.partitionCustom(new AllZeroPartitioner(), new IdKeySelector<Long>())
+			.map(new FailExceptInPartitionZeroMapper())
+			.output(new DiscardingOutputFormat<Long>());
+		
+		env.execute();
+	}
+	
+	public static class FailExceptInPartitionZeroMapper extends RichMapFunction<Long, Long> {
+		
+		@Override
+		public Long map(Long value) throws Exception {
+			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+				return value;
+			} else {
+				throw new Exception("Received data in a partition other than partition 0");
+			}
+		}
+	}
+	
+	public static class AllZeroPartitioner implements Partitioner<Long> {
+		@Override
+		public int partition(Long key, int numPartitions) {
+			return 0;
+		}
+	}
+	
+	public static class IdKeySelector<T> implements KeySelector<T, T> {
+		@Override
+		public T getKey(T value) {
+			return value;
+		}
+	}
+}