You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/12 23:03:55 UTC
[4/9] git commit: added own instantiate method to
CoGroupDescriptorWithSolutionSetFirst; added Test
added own instantiate method to CoGroupDescriptorWithSolutionSetFirst; added Test
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f403970e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f403970e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f403970e
Branch: refs/heads/release-0.5.1
Commit: f403970eb737e703e85927d05ba4ac48a594310a
Parents: 387bb5d
Author: Sebastian Kunert <sk...@gmail.com>
Authored: Thu Jun 12 14:06:24 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:47:55 2014 +0200
----------------------------------------------------------------------
.../compiler/operators/CoGroupDescriptor.java | 2 +-
.../CoGroupWithSolutionSetFirstDescriptor.java | 22 +++-
.../compiler/CoGroupSolutionSetFirstTest.java | 103 +++++++++++++++++++
3 files changed, 125 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f403970e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
index e2614ea..edc6c69 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
@@ -128,7 +128,7 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
- boolean[] inputOrders = in1.getLocalProperties().getOrdering().getFieldSortDirections();
+ boolean[] inputOrders = in1.getLocalProperties().getOrdering() == null ? null : in1.getLocalProperties().getOrdering().getFieldSortDirections();
if (inputOrders == null || inputOrders.length < this.keys1.size()) {
throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a CoGroup operator.");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f403970e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
index 90d0662..e8dd433 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
@@ -17,9 +17,14 @@ import java.util.Collections;
import java.util.List;
import eu.stratosphere.api.common.operators.util.FieldList;
+import eu.stratosphere.compiler.CompilerException;
+import eu.stratosphere.compiler.dag.TwoInputNode;
import eu.stratosphere.compiler.dataproperties.LocalProperties;
import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.DualInputPlanNode;
import eu.stratosphere.compiler.util.Utils;
+import eu.stratosphere.pact.runtime.task.DriverStrategy;
/**
*
@@ -36,7 +41,22 @@ public class CoGroupWithSolutionSetFirstDescriptor extends CoGroupDescriptor {
RequestedLocalProperties sort = new RequestedLocalProperties(Utils.createOrdering(this.keys2));
return Collections.singletonList(new LocalPropertiesPair(none, sort));
}
-
+
+ @Override
+ public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+ boolean[] inputOrders = in2.getLocalProperties().getOrdering() == null ? null : in2.getLocalProperties().getOrdering().getFieldSortDirections();
+
+ if (inputOrders == null || inputOrders.length < this.keys2.size()) {
+ throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a CoGroup operator.");
+ } else if (inputOrders.length > this.keys2.size()) {
+ boolean[] tmp = new boolean[this.keys2.size()];
+ System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
+ inputOrders = tmp;
+ }
+
+ return new DualInputPlanNode(node, "CoGroup ("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders);
+ }
+
@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f403970e/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
new file mode 100644
index 0000000..d0fca91
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
@@ -0,0 +1,103 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.compiler;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.DeltaIteration;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.CoGroupFunction;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.compiler.CompilerException;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.DualInputPlanNode;
+import eu.stratosphere.compiler.plan.OptimizedPlan;
+import eu.stratosphere.compiler.plan.PlanNode;
+import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
+import eu.stratosphere.util.Collector;
+import eu.stratosphere.util.Visitor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
+ public static class SimpleCGroup extends CoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
+ @Override
+ public void coGroup(Iterator<Tuple1<Integer>> first, Iterator<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) throws Exception {
+ }
+ }
+
+ public static class SimpleMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+ @Override
+ public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
+ return null;
+ }
+ }
+
+ @Test
+ public void testCoGroupSolutionSet() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple1<Integer>> raw = env.readCsvFile(IN_FILE).types(Integer.class);
+
+ DeltaIteration<Tuple1<Integer>, Tuple1<Integer>> iteration = raw.iterateDelta(raw, 1000, 0);
+
+ DataSet<Tuple1<Integer>> test = iteration.getWorkset().map(new SimpleMap());
+ DataSet<Tuple1<Integer>> delta = iteration.getSolutionSet().coGroup(test).where(0).equalTo(0).with(new SimpleCGroup());
+ DataSet<Tuple1<Integer>> feedback = iteration.getWorkset().map(new SimpleMap());
+ DataSet<Tuple1<Integer>> result = iteration.closeWith(delta, feedback);
+
+ result.print();
+
+ Plan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = null;
+ try {
+ oPlan = compileNoStats(plan);
+ } catch(CompilerException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ oPlan.accept(new Visitor<PlanNode>() {
+ @Override
+ public boolean preVisit(PlanNode visitable) {
+ System.out.println(visitable);
+ if (visitable instanceof WorksetIterationPlanNode) {
+ PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();
+
+ //get the CoGroup
+ DualInputPlanNode dpn = (DualInputPlanNode) deltaNode.getInputs().next().getSource();
+ Channel in1 = dpn.getInput1();
+ Channel in2 = dpn.getInput2();
+
+ Assert.assertTrue(in1.getLocalProperties().getOrdering() == null);
+ Assert.assertTrue(in2.getLocalProperties().getOrdering() != null);
+ Assert.assertTrue(in2.getLocalProperties().getOrdering().getInvolvedIndexes().contains(0));
+ Assert.assertTrue(in1.getShipStrategy() == ShipStrategyType.FORWARD);
+ Assert.assertTrue(in2.getShipStrategy() == ShipStrategyType.PARTITION_HASH);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void postVisit(PlanNode visitable) {
+
+ }
+ });
+
+ }
+
+}