You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/14 16:44:23 UTC

[1/4] git commit: [FLINK-1051] ./bin/start-local.bat prints error message if java.exe cannot be found

Repository: incubator-flink
Updated Branches:
  refs/heads/release-0.6 e44fd095c -> 87dd7c7fb


[FLINK-1051] ./bin/start-local.bat prints error message if java.exe cannot be found


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/87dd7c7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/87dd7c7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/87dd7c7f

Branch: refs/heads/release-0.6
Commit: 87dd7c7fb484941a86a697bd73db38a55ec2a274
Parents: 8fd70ae
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Aug 14 16:05:06 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Aug 14 16:43:46 2014 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/start-local.bat | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/87dd7c7f/flink-dist/src/main/flink-bin/bin/start-local.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.bat b/flink-dist/src/main/flink-bin/bin/start-local.bat
index 7e0b28a..e77b40c 100644
--- a/flink-dist/src/main/flink-bin/bin/start-local.bat
+++ b/flink-dist/src/main/flink-bin/bin/start-local.bat
@@ -48,6 +48,11 @@ RENAME "%outname%" "%outname%.0"  2> nul
 DEL "%logname%.6"  2> nul
 DEL "%outname%.6"  2> nul
 
+for %%X in (java.exe) do (set FOUND=%%~$PATH:X)
+if not defined FOUND (
+    echo java.exe was not found in PATH variable
+    goto :eof
+)
 
 echo Starting Flink job manager. Webinterface by default on http://localhost:8081/.
 echo Don't close this batch window. Stop job manager by pressing Ctrl+C.


[2/4] git commit: [FLINK-1047] Fixes main class for start-local.bat

Posted by rm...@apache.org.
[FLINK-1047] Fixes main class for start-local.bat


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6a43d249
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6a43d249
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6a43d249

Branch: refs/heads/release-0.6
Commit: 6a43d249302cc0b3ef70ea4bf599e54491e1c7a0
Parents: 3856db8
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Aug 14 15:44:24 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Aug 14 16:43:46 2014 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/start-local.bat | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6a43d249/flink-dist/src/main/flink-bin/bin/start-local.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.bat b/flink-dist/src/main/flink-bin/bin/start-local.bat
index 23a6d74..7e0b28a 100644
--- a/flink-dist/src/main/flink-bin/bin/start-local.bat
+++ b/flink-dist/src/main/flink-bin/bin/start-local.bat
@@ -52,6 +52,6 @@ DEL "%outname%.6"  2> nul
 echo Starting Flink job manager. Webinterface by default on http://localhost:8081/.
 echo Don't close this batch window. Stop job manager by pressing Ctrl+C.
 
-java %JVM_ARGS% %log_setting% -cp %NEPHELE_JM_CLASSPATH% org.apache.flink.nephele.jobmanager.JobManager -executionMode local -configDir %NEPHELE_CONF_DIR%  > "%out%"  2>&1
+java %JVM_ARGS% %log_setting% -cp %NEPHELE_JM_CLASSPATH% org.apache.flink.runtime.jobmanager.JobManager -executionMode local -configDir %NEPHELE_CONF_DIR%  > "%out%"  2>&1
 
 endlocal


[3/4] git commit: [FLINK-1048] Swallow output of SET statements in ./bin/flink.bat

Posted by rm...@apache.org.
[FLINK-1048] Swallow output of SET statements in ./bin/flink.bat


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8fd70aee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8fd70aee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8fd70aee

Branch: refs/heads/release-0.6
Commit: 8fd70aeed20e40119d52d0a0a42bff16bf31f329
Parents: 6a43d24
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Aug 14 15:45:59 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Aug 14 16:43:46 2014 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/flink.bat | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8fd70aee/flink-dist/src/main/flink-bin/bin/flink.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink.bat b/flink-dist/src/main/flink-bin/bin/flink.bat
index 2275e07..a51baca 100644
--- a/flink-dist/src/main/flink-bin/bin/flink.bat
+++ b/flink-dist/src/main/flink-bin/bin/flink.bat
@@ -16,6 +16,7 @@
 :: limitations under the License.
 ::###############################################################################
 
+@echo off
 setlocal
 
 SET bin=%~dp0


[4/4] git commit: Add Sanity Checks for Join/CoGroup with SolutionSet in Record API

Posted by rm...@apache.org.
Add Sanity Checks for Join/CoGroup with SolutionSet in Record API

We now check whether the key fields specified for the delta iterations
match those specified when joining/coGrouping with the solution set.
This code is more or less copied from the new Java API.

The Scala API uses the Record API underneath so it is also covered.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3856db89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3856db89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3856db89

Branch: refs/heads/release-0.6
Commit: 3856db89f22a1726570ad4fd9b259eca1062d9d4
Parents: e44fd09
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Aug 14 14:20:23 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Aug 14 16:43:46 2014 +0200

----------------------------------------------------------------------
 .../operators/base/DeltaIterationBase.java      |   2 +-
 .../java/record/operators/CoGroupOperator.java  |  10 +
 .../java/record/operators/DeltaIteration.java   |  10 +
 .../api/java/record/operators/JoinOperator.java |  10 +
 .../scala/DeltaIterationSanityCheckTest.scala   | 192 +++++++++++++++++++
 5 files changed, 223 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3856db89/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
index 8e955b1..a7dab4f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
@@ -282,7 +282,7 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab
 	 */
 	public static class SolutionSetPlaceHolder<ST> extends Operator<ST> {
 
-		private final DeltaIterationBase<ST, ?> containingIteration;
+		protected final DeltaIterationBase<ST, ?> containingIteration;
 
 		public SolutionSetPlaceHolder(DeltaIterationBase<ST, ?> container, OperatorInformation<ST> operatorInfo) {
 			super(operatorInfo, "Solution Set Place Holder");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3856db89/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
index c958b84..0dc4acc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
@@ -98,6 +98,16 @@ public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record,
 		if (builder.inputs2 != null && !builder.inputs2.isEmpty()) {
 			setSecondInput(Operator.createUnionCascade(builder.inputs2));
 		}
+
+		// sanity check solution set key mismatches
+		if (input1 instanceof DeltaIteration.SolutionSetPlaceHolder) {
+			int[] positions = getKeyColumns(0);
+			((DeltaIteration.SolutionSetPlaceHolder) input1).checkJoinKeyFields(positions);
+		}
+		if (input2 instanceof DeltaIteration.SolutionSetPlaceHolder) {
+			int[] positions = getKeyColumns(1);
+			((DeltaIteration.SolutionSetPlaceHolder) input2).checkJoinKeyFields(positions);
+		}
 		
 		setBroadcastVariables(builder.broadcastInputs);
 		setGroupOrderForInputOne(builder.secondaryOrder1);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3856db89/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
index 7e9360c..ba2f456 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/DeltaIteration.java
@@ -19,11 +19,14 @@
 
 package org.apache.flink.api.java.record.operators;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.OperatorInformation;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
 import org.apache.flink.api.java.typeutils.RecordTypeInfo;
 import org.apache.flink.types.Record;
 
+import java.util.Arrays;
+
 /**
  * A DeltaIteration is similar to a {@link BulkIteration}, 
  * but maintains state across the individual iteration steps. The state is called the <i>solution set</i>, can be obtained
@@ -78,5 +81,12 @@ public class DeltaIteration extends DeltaIterationBase<Record, Record> {
 		public SolutionSetPlaceHolder(DeltaIterationBase<Record, ?> container) {
 			super(container, new OperatorInformation<Record>(new RecordTypeInfo()));
 		}
+
+		public void checkJoinKeyFields(int[] keyFields) {
+			int[] ssKeys = containingIteration.getSolutionSetKeyFields();
+			if (!Arrays.equals(ssKeys, keyFields)) {
+				throw new InvalidProgramException("The solution can only be joined/co-grouped with the same keys as the elements are identified with (here: " + Arrays.toString(ssKeys) + ").");
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3856db89/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
index 0ddb44a..f70de88 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
@@ -88,6 +88,16 @@ public class JoinOperator extends JoinOperatorBase<Record, Record, Record, JoinF
 		if (builder.inputs2 != null && !builder.inputs2.isEmpty()) {
 			setSecondInput(Operator.createUnionCascade(builder.inputs2));
 		}
+
+		// sanity check solution set key mismatches
+		if (input1 instanceof DeltaIteration.SolutionSetPlaceHolder) {
+			int[] positions = getKeyColumns(0);
+			((DeltaIteration.SolutionSetPlaceHolder) input1).checkJoinKeyFields(positions);
+		}
+		if (input2 instanceof DeltaIteration.SolutionSetPlaceHolder) {
+			int[] positions = getKeyColumns(1);
+			((DeltaIteration.SolutionSetPlaceHolder) input2).checkJoinKeyFields(positions);
+		}
 		
 		setBroadcastVariables(builder.broadcastInputs);
 		setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(builder.udf));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3856db89/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
new file mode 100644
index 0000000..b4927b4
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.junit.Test
+import org.apache.flink.api.common.InvalidProgramException
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.operators._
+import org.scalatest.junit.AssertionsForJUnit
+
+// Verify that the sanity checking in delta iterations works. We just
+// have a dummy job that is not meant to be executed. Only verify that
+// the join/coGroup inside the iteration is checked.
+class DeltaIterationSanityCheckTest extends Serializable {
+
+  @Test
+  def testCorrectJoinWithSolution1 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test
+  def testCorrectJoinWithSolution2 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = ws join s where {_._1} isEqualTo {_._1} map { (l, r) => l }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution1 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = s join ws where {_._2} isEqualTo {_._2} map { (l, r) => l }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution2 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = ws join s where {_._2} isEqualTo {_._2} map { (l, r) => l }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution3 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = s join ws where {_._1} isEqualTo {_._1} map { (l, r) => l }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+   }
+
+  @Test
+  def testCorrectCoGroupWithSolution1 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test
+  def testCorrectCoGroupWithSolution2 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = ws cogroup s where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution1 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = s cogroup ws where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution2 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = ws cogroup s where {_._2} isEqualTo {_._2} map { (l, r) => l.next() }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._1}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution3 {
+    val solutionInput = CollectionDataSource(Array((1, "1")))
+    val worksetInput = CollectionDataSource(Array((2, "2")))
+
+    def step(s: DataSet[(Int, String)], ws: DataSet[(Int, String)]) = {
+      val result = s cogroup ws where {_._1} isEqualTo {_._1} map { (l, r) => l.next() }
+      (result, ws)
+    }
+    val iteration = solutionInput.iterateWithDelta(worksetInput, {_._2}, step, 10)
+
+    val output = iteration.write("/dummy", CsvOutputFormat())
+
+    val plan = new ScalaPlan(Seq(output))
+  }
+}