You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/20 20:52:52 UTC
[1/2] incubator-flink git commit: [FLINK-820] [compiler] Support for
disconnected data flows
Repository: incubator-flink
Updated Branches:
refs/heads/master b3e5ed0ba -> 98ff76b0e
[FLINK-820] [compiler] Support for disconnected data flows
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/98ff76b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/98ff76b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/98ff76b0
Branch: refs/heads/master
Commit: 98ff76b0ea61a342250e15411edd9f7974cbe96d
Parents: d0f2db0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 20 16:43:18 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 20 19:14:47 2014 +0100
----------------------------------------------------------------------
.../apache/flink/compiler/dag/SinkJoiner.java | 36 +++++++-------
.../compiler/BranchingPlansCompilerTest.java | 47 +-----------------
.../flink/compiler/DisjointDataFlowsTest.java | 51 ++++++++++++++++++++
.../test/misc/DisjointDataflowsITCase.java | 37 ++++++++++++++
4 files changed, 108 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98ff76b0/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
index b37c6ac..c153078 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SinkJoiner.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.dag;
import java.util.ArrayList;
@@ -24,7 +23,6 @@ import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
-import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.operators.OperatorDescriptorDual;
import org.apache.flink.compiler.operators.UtilSinkJoinOpDescriptor;
@@ -74,23 +72,25 @@ public class SinkJoiner extends TwoInputNode {
// if the predecessors do not have branches, then we have multiple sinks that do not originate from
// a common data flow.
- if (pred1branches == null || pred1branches.isEmpty() || pred2branches == null || pred2branches.isEmpty()) {
- throw new CompilerException("The given program contains multiple disconnected data flows.");
+ if (pred1branches == null || pred1branches.isEmpty()) {
+
+ this.openBranches = (pred2branches == null || pred2branches.isEmpty()) ?
+ Collections.<UnclosedBranchDescriptor>emptyList() : // both empty - disconnected flow
+ pred2branches;
+ }
+ else if (pred2branches == null || pred2branches.isEmpty()) {
+ this.openBranches = pred1branches;
+ }
+ else {
+ // copy the lists and merge
+ List<UnclosedBranchDescriptor> result1 = new ArrayList<UnclosedBranchDescriptor>(pred1branches);
+ List<UnclosedBranchDescriptor> result2 = new ArrayList<UnclosedBranchDescriptor>(pred2branches);
+
+ ArrayList<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>();
+ mergeLists(result1, result2, result);
+
+ this.openBranches = result.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
}
-
- // copy the lists and merge
- List<UnclosedBranchDescriptor> result1 = new ArrayList<UnclosedBranchDescriptor>(pred1branches);
- List<UnclosedBranchDescriptor> result2 = new ArrayList<UnclosedBranchDescriptor>(pred2branches);
-
- ArrayList<UnclosedBranchDescriptor> result = new ArrayList<UnclosedBranchDescriptor>();
- mergeLists(result1, result2, result);
-
-// if (!didCloseSomeBranch) {
-// // if the sink joiners do not close branches, then we have disjoint data flows.
-// throw new CompilerException("The given program contains multiple disconnected data flows.");
-// }
-
- this.openBranches = result.isEmpty() ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98ff76b0/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
index 196f602..2b4ff02 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
@@ -566,42 +566,6 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
/**
*
* <pre>
- * (SINK A) (SINK B)
- * / /
- * (SRC A) (SRC B)
- * </pre>
- */
- @Test
- public void testSimpleDisjointPlan() {
- // construct the plan
- final String out1Path = "file:///test/1";
- final String out2Path = "file:///test/2";
-
- FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
- FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE);
-
- FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA);
- FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, sourceB);
-
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sinkA);
- sinks.add(sinkB);
-
- // return the PACT plan
- Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks");
-
- try {
- compileNoStats(plan);
- Assert.fail("Plan must not be compilable, it contains disjoint sub-plans.");
- }
- catch (Exception ex) {
- // as expected
- }
- }
-
- /**
- *
- * <pre>
* (SINK 3) (SINK 1) (SINK 2) (SINK 4)
* \ / \ /
* (SRC A) (SRC B)
@@ -609,7 +573,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
*
* NOTE: this case is currently not caught by the compiler. we should enable the test once it is caught.
*/
-// @Test (Deactivated for now because of unsupported feature)
+ @Test
public void testBranchingDisjointPlan() {
// construct the plan
final String out1Path = "file:///test/1";
@@ -634,14 +598,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
// return the PACT plan
Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks and branches");
-
- try {
- compileNoStats(plan);
- Assert.fail("Plan must not be compilable, it contains disjoint sub-plans.");
- }
- catch (Exception ex) {
- // as expected
- }
+ compileNoStats(plan);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98ff76b0/flink-compiler/src/test/java/org/apache/flink/compiler/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/DisjointDataFlowsTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/DisjointDataFlowsTest.java
new file mode 100644
index 0000000..0c7bbef
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DisjointDataFlowsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class DisjointDataFlowsTest extends CompilerTestBase {
+
+ @Test
+ public void testDisjointFlows() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // generate two different flows
+ env.generateSequence(1, 10).print();
+ env.generateSequence(1, 10).print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ new NepheleJobGraphGenerator().compileJobGraph(op);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98ff76b0/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
new file mode 100644
index 0000000..f4d3d0b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class DisjointDataflowsITCase extends JavaProgramTestBase {
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // generate two different flows
+ env.generateSequence(1, 10).output(new DiscardingOuputFormat<Long>());
+ env.generateSequence(1, 10).output(new DiscardingOuputFormat<Long>());
+
+ env.execute();
+ }
+}
[2/2] incubator-flink git commit: [FLINK-1264] [compiler] Properly
forward custom partitioners to the runtime
Posted by se...@apache.org.
[FLINK-1264] [compiler] Properly forward custom partitioners to the runtime
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d0f2db06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d0f2db06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d0f2db06
Branch: refs/heads/master
Commit: d0f2db06e17d1a5d20adcf4f9ca555f4885774b9
Parents: b3e5ed0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 20 17:03:50 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 20 19:14:47 2014 +0100
----------------------------------------------------------------------
.../plantranslate/NepheleJobGraphGenerator.java | 16 +++--
.../test/misc/CustomPartitioningITCase.java | 73 ++++++++++++++++++++
2 files changed, 82 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0f2db06/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 64eca7c..eb6fe2e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -1072,19 +1072,21 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
if (channel.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) {
final DataDistribution dataDistribution = channel.getDataDistribution();
- if(dataDistribution != null) {
+ if (dataDistribution != null) {
sourceConfig.setOutputDataDistribution(dataDistribution, outputIndex);
} else {
throw new RuntimeException("Range partitioning requires data distribution");
// TODO: inject code and configuration for automatic histogram generation
}
}
-// if (targetContract instanceof GenericDataSink) {
-// final DataDistribution distri = ((GenericDataSink) targetContract).getDataDistribution();
-// if (distri != null) {
-// configForOutputShipStrategy.setOutputDataDistribution(distri);
-// }
-// }
+
+ if (channel.getShipStrategy() == ShipStrategyType.PARTITION_CUSTOM) {
+ if (channel.getPartitioner() != null) {
+ sourceConfig.setOutputPartitioner(channel.getPartitioner(), outputIndex);
+ } else {
+ throw new CompilerException("The ship strategy was set to custom partitioning, but no partitioner was set.");
+ }
+ }
// ---------------- configure the receiver -------------------
if (isBroadcast) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d0f2db06/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
new file mode 100644
index 0000000..c308007
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomPartitioningITCase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.Assert;
+
+@SuppressWarnings("serial")
+public class CustomPartitioningITCase extends JavaProgramTestBase {
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ if (!isCollectionExecution()) {
+ Assert.assertTrue(env.getDegreeOfParallelism() > 1);
+ }
+
+ env.generateSequence(1, 1000)
+ .partitionCustom(new AllZeroPartitioner(), new IdKeySelector<Long>())
+ .map(new FailExceptInPartitionZeroMapper())
+ .output(new DiscardingOutputFormat<Long>());
+
+ env.execute();
+ }
+
+ public static class FailExceptInPartitionZeroMapper extends RichMapFunction<Long, Long> {
+
+ @Override
+ public Long map(Long value) throws Exception {
+ if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+ return value;
+ } else {
+ throw new Exception("Received data in a partition other than partition 0");
+ }
+ }
+ }
+
+ public static class AllZeroPartitioner implements Partitioner<Long> {
+ @Override
+ public int partition(Long key, int numPartitions) {
+ return 0;
+ }
+ }
+
+ public static class IdKeySelector<T> implements KeySelector<T, T> {
+ @Override
+ public T getKey(T value) {
+ return value;
+ }
+ }
+}