You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/12 23:03:55 UTC

[4/9] git commit: added own instantiate method to CoGroupDescriptorWithSolutionSetFirst; added Test

added own instantiate method to CoGroupDescriptorWithSolutionSetFirst; added Test


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f403970e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f403970e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f403970e

Branch: refs/heads/release-0.5.1
Commit: f403970eb737e703e85927d05ba4ac48a594310a
Parents: 387bb5d
Author: Sebastian Kunert <sk...@gmail.com>
Authored: Thu Jun 12 14:06:24 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:47:55 2014 +0200

----------------------------------------------------------------------
 .../compiler/operators/CoGroupDescriptor.java   |   2 +-
 .../CoGroupWithSolutionSetFirstDescriptor.java  |  22 +++-
 .../compiler/CoGroupSolutionSetFirstTest.java   | 103 +++++++++++++++++++
 3 files changed, 125 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f403970e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
index e2614ea..edc6c69 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupDescriptor.java
@@ -128,7 +128,7 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 
 	@Override
 	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
-		boolean[] inputOrders = in1.getLocalProperties().getOrdering().getFieldSortDirections();
+		boolean[] inputOrders = in1.getLocalProperties().getOrdering() == null ? null : in1.getLocalProperties().getOrdering().getFieldSortDirections();
 		
 		if (inputOrders == null || inputOrders.length < this.keys1.size()) {
 			throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a CoGroup operator.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f403970e/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
index 90d0662..e8dd433 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/CoGroupWithSolutionSetFirstDescriptor.java
@@ -17,9 +17,14 @@ import java.util.Collections;
 import java.util.List;
 
 import eu.stratosphere.api.common.operators.util.FieldList;
+import eu.stratosphere.compiler.CompilerException;
+import eu.stratosphere.compiler.dag.TwoInputNode;
 import eu.stratosphere.compiler.dataproperties.LocalProperties;
 import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.DualInputPlanNode;
 import eu.stratosphere.compiler.util.Utils;
+import eu.stratosphere.pact.runtime.task.DriverStrategy;
 
 /**
  * 
@@ -36,7 +41,22 @@ public class CoGroupWithSolutionSetFirstDescriptor extends CoGroupDescriptor {
 		RequestedLocalProperties sort = new RequestedLocalProperties(Utils.createOrdering(this.keys2));
 		return Collections.singletonList(new LocalPropertiesPair(none, sort));
 	}
-	
+
+	@Override
+	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+		boolean[] inputOrders = in2.getLocalProperties().getOrdering() == null ? null : in2.getLocalProperties().getOrdering().getFieldSortDirections();
+
+		if (inputOrders == null || inputOrders.length < this.keys2.size()) {
+			throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a CoGroup operator.");
+		} else if (inputOrders.length > this.keys2.size()) {
+			boolean[] tmp = new boolean[this.keys2.size()];
+			System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
+			inputOrders = tmp;
+		}
+
+		return new DualInputPlanNode(node, "CoGroup ("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders);
+	}
+
 	@Override
 	public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
 			LocalProperties produced1, LocalProperties produced2)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f403970e/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
new file mode 100644
index 0000000..d0fca91
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CoGroupSolutionSetFirstTest.java
@@ -0,0 +1,103 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.compiler;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.DeltaIteration;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.CoGroupFunction;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.compiler.CompilerException;
+import eu.stratosphere.compiler.plan.Channel;
+import eu.stratosphere.compiler.plan.DualInputPlanNode;
+import eu.stratosphere.compiler.plan.OptimizedPlan;
+import eu.stratosphere.compiler.plan.PlanNode;
+import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
+import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
+import eu.stratosphere.util.Collector;
+import eu.stratosphere.util.Visitor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
+	public static class SimpleCGroup extends CoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
+		@Override
+		public void coGroup(Iterator<Tuple1<Integer>> first, Iterator<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) throws Exception {
+		}
+	}
+
+	public static class SimpleMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+		@Override
+		public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
+			return null;
+		}
+	}
+
+	@Test
+	public void testCoGroupSolutionSet() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple1<Integer>> raw = env.readCsvFile(IN_FILE).types(Integer.class);
+
+		DeltaIteration<Tuple1<Integer>, Tuple1<Integer>> iteration = raw.iterateDelta(raw, 1000, 0);
+
+		DataSet<Tuple1<Integer>> test = iteration.getWorkset().map(new SimpleMap());
+		DataSet<Tuple1<Integer>> delta = iteration.getSolutionSet().coGroup(test).where(0).equalTo(0).with(new SimpleCGroup());
+		DataSet<Tuple1<Integer>> feedback = iteration.getWorkset().map(new SimpleMap());
+		DataSet<Tuple1<Integer>> result = iteration.closeWith(delta, feedback);
+
+		result.print();
+
+		Plan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = null;
+		try {
+			oPlan = compileNoStats(plan);
+		} catch(CompilerException e) {
+			Assert.fail(e.getMessage());
+		}
+
+		oPlan.accept(new Visitor<PlanNode>() {
+			@Override
+			public boolean preVisit(PlanNode visitable) {
+				System.out.println(visitable);
+				if (visitable instanceof WorksetIterationPlanNode) {
+					PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();
+
+					//get the CoGroup
+					DualInputPlanNode dpn = (DualInputPlanNode) deltaNode.getInputs().next().getSource();
+					Channel in1 = dpn.getInput1();
+					Channel in2 = dpn.getInput2();
+
+					Assert.assertTrue(in1.getLocalProperties().getOrdering() == null);
+					Assert.assertTrue(in2.getLocalProperties().getOrdering() != null);
+					Assert.assertTrue(in2.getLocalProperties().getOrdering().getInvolvedIndexes().contains(0));
+					Assert.assertTrue(in1.getShipStrategy() == ShipStrategyType.FORWARD);
+					Assert.assertTrue(in2.getShipStrategy() == ShipStrategyType.PARTITION_HASH);
+					return false;
+				}
+				return true;
+			}
+
+			@Override
+			public void postVisit(PlanNode visitable) {
+
+			}
+		});
+
+	}
+
+}