You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/14 16:44:26 UTC

[4/4] git commit: Add Sanity Checks for Join/CoGroup with SolutionSet in Record API

Add Sanity Checks for Join/CoGroup with SolutionSet in Record API

We now check whether the key fields specified for the delta iterations
match those specified when joining/coGrouping with the solution set.
This code is more or less copied from the new Java API.

The Scala API uses the Record API underneath so it is also covered.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3856db89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3856db89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3856db89

Branch: refs/heads/release-0.6
Commit: 3856db89f22a1726570ad4fd9b259eca1062d9d4
Parents: e44fd09
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Aug 14 14:20:23 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Aug 14 16:43:46 2014 +0200

----------------------------------------------------------------------
 .../operators/base/DeltaIterationBase.java      |   2 +-
 .../java/record/operators/CoGroupOperator.java  |  10 +
 .../java/record/operators/DeltaIteration.java   |  10 +
 .../api/java/record/operators/JoinOperator.java |  10 +
 .../scala/DeltaIterationSanityCheckTest.scala   | 192 +++++++++++++++++++
 5 files changed, 223 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3856db89/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
index 8e955b1..a7dab4f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
@@ -282,7 +282,7 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab
 	 */
 	public static class SolutionSetPlaceHolder<ST> extends Operator<ST> {
 
-		private final DeltaIterationBase<ST, ?> containingIteration;
+		protected final DeltaIterationBase<ST, ?> containingIteration;
 
 		public SolutionSetPlaceHolder(DeltaIterationBase<ST, ?> container, OperatorInformation<ST> operatorInfo) {
 			super(operatorInfo, "Solution Set Place Holder");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3856db89/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
index c958b84..0dc4acc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
@@ -98,6 +98,16 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
 		if (builder.inputs2 != null && !builder.inputs2.isEmpty()) {
 			setSecondInput(Operator.createUnionCascade(builder.inputs2));
 		}
+
+		// sanity check solution set key mismatches
+		if (input1 instanceof DeltaIteration.SolutionSetPlaceHolder) {
+			int[] positions = getKeyColumns(0);
+			((DeltaIteration.SolutionSetPlaceHolder) input1).checkJoinKeyFields(positions);
+		}
+		if (input2 instanceof DeltaIteration.SolutionSetPlaceHolder) {
+			int[] positions = getKeyColumns(1);
+			((DeltaIteration.SolutionSetPlaceHolder) input2).checkJoinKeyFields(positions);
+		}
 		
 		setBroadcastVariables(builder.broadcastInputs);
 		setGroupOrderForInputOne(builder.secondaryOrder1);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3856db89/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
index 7e9360c..ba2f456 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
@@ -19,11 +19,14 @@
 
 package org.apache.flink.api.java.record.operators;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.api.java.typeutils.RecordTypeInfo;
 import org.apache.flink.types.Record;
 
+import java.util.Arrays;
+
 /**
  * A DeltaIteration is similar to a {@link BulkIteration}, 
  * but maintains state across the individual iteration steps. The state is called the <i>solution set</i>, can be obtained
@@ -78,5 +81,12 @@ public class DeltaIteration extends DeltaIterationBase<Record, Record> {
 		public SolutionSetPlaceHolder(DeltaIterationBase<Record, ?> container) {
 			super(container, new OperatorInformation<Record>(new RecordTypeInfo()));
 		}
+
+		public void checkJoinKeyFields(int[] keyFields) {
+			int[] ssKeys = containingIteration.getSolutionSetKeyFields();
+			if (!Arrays.equals(ssKeys, keyFields)) {
+				throw new InvalidProgramException("The solution can only be joined/co-grouped with the same keys as the elements are identified with (here: " + Arrays.toString(ssKeys) + ").");
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3856db89/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
index 0ddb44a..f70de88 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
@@ -88,6 +88,16 @@ public class JoinOperator extends JoinOperatorBase<Record, Record, Record, JoinF
 		if (builder.inputs2 != null && !builder.inputs2.isEmpty()) {
 			setSecondInput(Operator.createUnionCascade(builder.inputs2));
 		}
+
+		// sanity check solution set key mismatches
+		if (input1 instanceof DeltaIteration.SolutionSetPlaceHolder) {
+			int[] positions = getKeyColumns(0);
+			((DeltaIteration.SolutionSetPlaceHolder) input1).checkJoinKeyFields(positions);
+		}
+		if (input2 instanceof DeltaIteration.SolutionSetPlaceHolder) {
+			int[] positions = getKeyColumns(1);
+			((DeltaIteration.SolutionSetPlaceHolder) input2).checkJoinKeyFields(positions);
+		}
 		
 		setBroadcastVariables(builder.broadcastInputs);
 		setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(builder.udf));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3856db89/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
new file mode 100644
index 0000000..b4927b4
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.junit.Test
+import org.apache.flink.api.common.InvalidProgramException
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.operators._
+import org.scalatest.junit.AssertionsForJUnit
+
+// Verify that the sanity checking in delta iterations works. We just
+// have a dummy job that is not meant to be executed. Only verify that
+// the join/coGroup inside the iteration is checked.
+class DeltaIterationSanityCheckTest extends Serializable {
+
+  @Test
+  def testCorrectJoinWithSolution1 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test
+  def testCorrectJoinWithSolution2 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = ws join s where {_._1} isEqualTo {_._1} map { (l, r) => l }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution1 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = s join ws where {_._2} isEqualTo {_._2} map { (l, r) => l }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution2 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = ws join s where {_._2} isEqualTo {_._2} map { (l, r) => l }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution3 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+   }
+
+  @Test
+  def testCorrectCoGroupWithSolution1 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test
+  def testCorrectCoGroupWithSolution2 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = ws cogroup s where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution1 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = s cogroup ws where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution2 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = ws cogroup s where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution3 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+}