You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:23 UTC

[41/60] git commit: [scala] Fix Formatting in Examples and add ITCases

[scala] Fix Formatting in Examples and add ITCases

Also actually use termination criterion in TransitivelClosureNaive
Java example.

Add ConnectedComponentsITCase for Scala Example

Also fix some formatting in the example code

Add WebLogAnalysisITCase for Scala Example

Some minor reformatting of example code and scaladoc.

Add ITCases for TriangleEnumeration Scala Examples

Also fix some formatting and make TriangleEnumerationOpt Scala produce the
same output as the Java version.

Add PageRankITCase for Scala Example

Also fix formatting in PageRank Scala Example.

Fix formatting in EnumTriangles Scala Examples

Remove Old/Deprecated Scala Examples and ITCases

Fix formatting in EnumTrianglesBasic.scala

Fix formatting in LinearRegression Scala Example

Remove old Scala LineRank Code and RelQuery Example

[scala] Fix typo in scaladoc in GroupedDataSet

[scala] Fix Scaladoc of Join and CoGroup Operation

Was still referring to the type of join/coGroup function that returns an
Option.

Fix tab vs. spaces in flink-scala and flink-scala-examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/31ed0c4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/31ed0c4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/31ed0c4c

Branch: refs/heads/master
Commit: 31ed0c4ccd1103d67d8e9331399f963764f38e33
Parents: a41a29b
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Sep 10 17:59:55 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:59 2014 +0200

----------------------------------------------------------------------
 .../java/graph/TransitiveClosureNaive.java      |   2 +-
 .../graph/util/ConnectedComponentsData.java     |  12 +-
 flink-examples/flink-scala-examples/pom.xml     |  54 +++---
 .../scala/graph/EnumTrianglesBasic.scala        |  43 +++--
 .../examples/scala/graph/EnumTrianglesOpt.scala |   2 +
 .../flink/examples/scala/graph/LineRank.scala   |  96 ----------
 .../scala/graph/TransitiveClosureNaive.scala    |   2 +-
 .../scala/iterative/TerminationCriterion.scala  |  78 ---------
 .../examples/scala/ml/LinearRegression.scala    | 175 +++++++------------
 .../scala/relational/RelationalQuery.scala      | 107 ------------
 .../examples/scala/testing/KMeansForTest.scala  | 105 -----------
 .../scala/wordcount/WordCountWithCount.scala    |  63 -------
 .../WordCountWithUserDefinedType.scala          |  59 -------
 flink-scala/pom.xml                             |  38 ++--
 .../org/apache/flink/api/scala/DataSet.scala    |  17 +-
 .../apache/flink/api/scala/GroupedDataSet.scala |   4 +-
 .../ComputeEdgeDegreesITCase.java               |  39 -----
 .../ConnectedComponentsITCase.java              |  94 ++++++----
 .../EnumTriangleBasicITCase.java                |  46 +++++
 .../EnumTriangleOptITCase.java                  |  46 +++++
 .../EnumTrianglesOnEdgesWithDegreesITCase.java  |  39 -----
 .../exampleScalaPrograms/PageRankITCase.java    |  99 +++++++++++
 .../RelationalQueryITCase.java                  |  49 ------
 .../TransitiveClosureNaiveITCase.java           |  54 ------
 .../WebLogAnalysisITCase.java                   |  81 +++++----
 .../WordCountPactValueITCase.java               |  33 ----
 .../WordCountWithCountFunctionITCase.java       |  31 ----
 27 files changed, 447 insertions(+), 1021 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
index 30230d6..9754a34 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
@@ -94,7 +94,7 @@ public class TransitiveClosureNaive implements ProgramDescription {
 					}
 				});
 
-		DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths);
+		DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths, newPaths);
 
 
 		// emit result

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
index 27c7d45..59df7fe 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
@@ -33,17 +33,13 @@ import org.apache.flink.api.java.ExecutionEnvironment;
  */
 public class ConnectedComponentsData {
 	
-	public static final Object[][] VERTICES  = new Object[][] {
-		new Object[]{1L}, new Object[]{2L}, new Object[]{3L}, new Object[]{4L}, 
-		new Object[]{5L},new Object[]{6L}, new Object[]{7L}, new Object[]{8L}, 
-		new Object[]{9L}, new Object[]{10L}, new Object[]{11L}, new Object[]{12L}, 
-		new Object[]{13L}, new Object[]{14L}, new Object[]{15L}, new Object[]{16L}
-	};
+	public static final long[] VERTICES  = new long[] {
+			1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
 
 	public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env) {
 		List<Long> verticesList = new LinkedList<Long>();
-		for (Object[] vertex : VERTICES) {
-			verticesList.add((Long) vertex[0]);
+		for (long vertexId : VERTICES) {
+			verticesList.add(vertexId);
 		}
 		return env.fromCollection(verticesList);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-examples/flink-scala-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/pom.xml b/flink-examples/flink-scala-examples/pom.xml
index 3b50c8b..79053d6 100644
--- a/flink-examples/flink-scala-examples/pom.xml
+++ b/flink-examples/flink-scala-examples/pom.xml
@@ -38,11 +38,11 @@ under the License.
 			<artifactId>flink-scala</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java-examples</artifactId>
-            <version>${project.version}</version>
-        </dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java-examples</artifactId>
+			<version>${project.version}</version>
+		</dependency>
 	</dependencies>
 
 	<build>
@@ -54,7 +54,7 @@ under the License.
 				<version>3.1.4</version>
 				<executions>
 					<!-- Run scala compiler in the process-resources phase, so that dependencies on
-					    scala classes can be resolved later in the (Java) compile phase -->
+						scala classes can be resolved later in the (Java) compile phase -->
 					<execution>
 						<id>scala-compile-first</id>
 						<phase>process-resources</phase>
@@ -64,7 +64,7 @@ under the License.
 					</execution>
  
 					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
-					     scala classes can be resolved later in the (Java) test-compile phase -->
+						 scala classes can be resolved later in the (Java) test-compile phase -->
 					<execution>
 						<id>scala-test-compile</id>
 						<phase>process-test-resources</phase>
@@ -237,7 +237,7 @@ under the License.
 							</includes>
 						</configuration>
 					</execution>
-		               -->
+					   -->
 
 					<execution>
 						<id>WordCount</id>
@@ -260,7 +260,7 @@ under the License.
 							</includes>
 						</configuration>
 					</execution>
-                    <!--
+					<!--
 					<execution>
 						<id>ConnectedComponents</id>
 						<phase>package</phase>
@@ -282,27 +282,27 @@ under the License.
 					
 					-->
 
-                    <execution>
-                        <id>TransitiveClosureNaive</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>jar</goal>
-                        </goals>
+					<execution>
+						<id>TransitiveClosureNaive</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
 
-                        <configuration>
-                            <classifier>TransitiveClosureNaive</classifier>
+						<configuration>
+							<classifier>TransitiveClosureNaive</classifier>
 
-                            <archive>
-                                <manifestEntries>
-                                    <program-class>org.apache.flink.examples.scala.graph.TransitiveClosureNaive</program-class>
-                                </manifestEntries>
-                            </archive>
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.examples.scala.graph.TransitiveClosureNaive</program-class>
+								</manifestEntries>
+							</archive>
 
-                            <includes>
-                                <include>**/wordcount/TransitiveClosureNaive*.class</include>
-                            </includes>
-                        </configuration>
-                    </execution>
+							<includes>
+								<include>**/wordcount/TransitiveClosureNaive*.class</include>
+							</includes>
+						</configuration>
+					</execution>
 
 				</executions>
 			</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
index c920c31..fe121d5 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala
@@ -24,27 +24,26 @@ import org.apache.flink.api.common.functions.GroupReduceFunction
 import org.apache.flink.util.Collector
 import org.apache.flink.examples.java.graph.util.EnumTrianglesData
 import org.apache.flink.api.common.operators.Order
-import scala.collection.mutable.MutableList
+
+import scala.collection.mutable
 
 
 /**
  * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
  * A triangle consists of three edges that connect three vertices with each other.
  * 
- * <p>
- * The algorithm works as follows: 
+ * The algorithm works as follows:
  * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices 
  * that are connected by two edges. Finally, all triads are filtered for which no third edge exists 
  * that closes the triangle.
  *  
- * <p>
  * Input files are plain text files and must be formatted as follows:
- * <ul>
- * <li>Edges are represented as pairs for vertex IDs which are separated by space 
- * characters. Edges are separated by new-line characters.<br>
- * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
- * that include a triangle
- * </ul>
+ *
+ *  - Edges are represented as pairs for vertex IDs which are separated by space
+ *   characters. Edges are separated by new-line characters.
+ *   For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (undirected) edges (1)-(2), (2)-(12),
+ *   (1)-(12), and (42)-(63) that include a triangle
+ *
  * <pre>
  *     (1)
  *     /  \
@@ -59,13 +58,11 @@ import scala.collection.mutable.MutableList
  * If no parameters are provided, the program is run with default data from 
  * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]]
  * 
- * <p>
  * This example shows how to use:
- * <ul>
- * <li>Custom Java objects which extend Tuple
- * <li>Group Sorting
- * </ul>
- * 
+ *
+ *  - Custom Java objects which extend Tuple
+ *  - Group Sorting
+ *
  */
 object EnumTrianglesBasic {
 	
@@ -91,7 +88,7 @@ object EnumTrianglesBasic {
 		
 		// emit result
 		if (fileOutput) {
-			triangles.writeAsCsv(outputPath, "\n", " ")
+			triangles.writeAsCsv(outputPath, "\n", ",")
 		} else {
 			triangles.print()
 		}
@@ -119,12 +116,12 @@ object EnumTrianglesBasic {
 	 */
 	class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
 
-		val vertices = MutableList[Integer]()
+		val vertices = mutable.MutableList[Integer]()
 		
 		override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {
 			
 			// clear vertex list
-			vertices.clear
+			vertices.clear()
 
 			// build and emit triads
 			for(e <- edges.asScala) {
@@ -153,10 +150,10 @@ object EnumTrianglesBasic {
 				false
 			}
 		} else {
-			System.out.println("Executing Enum Triangles Basic example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: EnumTriangleBasic <edge path> <result path>");
+			System.out.println("Executing Enum Triangles Basic example with built-in default data.")
+			System.out.println("  Provide parameters to read input data from files.")
+			System.out.println("  See the documentation for the correct format of input files.")
+			System.out.println("  Usage: EnumTriangleBasic <edge path> <result path>")
 		}
 		true
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
index 80cce35..9370491 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala
@@ -26,6 +26,8 @@ import org.apache.flink.examples.java.graph.util.EnumTrianglesData
 import org.apache.flink.api.common.operators.Order
 import scala.collection.mutable.MutableList
 
+import scala.collection.mutable
+
 
 /**
  * Triangle enumeration is a pre-processing step to find closely connected parts in graphs.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/LineRank.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/LineRank.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/LineRank.scala
deleted file mode 100644
index 6902a6f..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/LineRank.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.examples.scala.graph
-//
-//import org.apache.flink.client.LocalExecutor
-//import org.apache.flink.api.common.{ ProgramDescription, Program }
-//
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//
-//
-//class LineRank extends Program with Serializable {
-//
-//  case class Edge(source: Int, target: Int, weight: Double)
-//  case class VectorElement(index: Int, value: Double)
-//
-//  override def getPlan(args: String*) = {
-//    getScalaPlan(args(0).toInt, args(1), args(2), args(3).toInt, args(4))
-//  }
-//
-//  def sumElements(elem1: VectorElement, elem2: VectorElement) = VectorElement(elem1.index, elem1.value + elem2.value)
-//
-//  def sgtTimes(SGT: DataSetOLD[Edge], vector: DataSetOLD[VectorElement]) = {
-//    SGT.join(vector).where(_.source).isEqualTo(_.index)
-//      .map((edge, elem) => VectorElement(edge.target, edge.weight * elem.value))
-//      .groupBy(_.index).reduce(sumElements)
-//  }
-//
-//  def tgTimes(TG: DataSetOLD[Edge], vector: DataSetOLD[VectorElement]) = {
-//    TG.join(vector).where(_.target).isEqualTo(_.index)
-//      .map((edge, elem) => VectorElement(edge.source, edge.weight * elem.value))
-//  }
-//
-//  def rescale(v3: DataSetOLD[VectorElement], c: Double, r: Double) = {
-//    v3.map(elem => { VectorElement(elem.index, c * elem.value + (1 - c) * r) })
-//  }
-//
-//  def powerMethod(SGT: DataSetOLD[Edge], TG: DataSetOLD[Edge], d: DataSetOLD[VectorElement], c: Double, r: Double)(v: DataSetOLD[VectorElement]) = {
-//
-//    val v1 = d.join(v).where(_.index).isEqualTo(_.index)
-//      .map((leftElem, rightElem) => VectorElement(leftElem.index, leftElem.value * rightElem.value))
-//
-//    val v2 = sgtTimes(SGT, v1)
-//    val v3 = tgTimes(TG, v2)
-//    val nextV = rescale(v3, c, r)
-//
-//    nextV
-//  }
-//
-//  def getScalaPlan(numSubTasks: Int, sourceIncidenceMatrixPath: String, targetIncidenceMatrixPath: String, m: Int,
-//    outputPath: String) = {
-//
-//    val c = .85
-//    val r = 1.0 / m
-//
-//    val SGT = DataSource(sourceIncidenceMatrixPath, CsvInputFormat[Edge]())
-//    val TG = DataSource(targetIncidenceMatrixPath, CsvInputFormat[Edge]())
-//
-//    val d1 = SGT.map(edge => VectorElement(edge.target, edge.weight))
-//      .groupBy(_.index)
-//      .reduce(sumElements)
-//
-//    val d2 = tgTimes(TG, d1)
-//
-//    val d = d2.map(elem => VectorElement(elem.index, 1 / elem.value))
-//
-//    val initialV1 = d.map(elem => VectorElement(elem.index, elem.value * m))
-//    val initialV2 = sgtTimes(SGT, initialV1)
-//    val initialV3 = tgTimes(TG, initialV2)
-//    val initialV = rescale(initialV3, c, r)
-//
-//    val v = initialV.iterate(5, powerMethod(SGT, TG, d, c, r))
-//
-//    val output = v.write(outputPath, CsvOutputFormat())
-//
-//    val plan = new ScalaPlan(Seq(output), "LineRank")
-//    plan.setDefaultParallelism(numSubTasks)
-//    plan
-//  }
-//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
index 25347ca..5416bb4 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
@@ -109,4 +109,4 @@ object TransitiveClosureNaive {
 			env.fromCollection(edgeData)
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/iterative/TerminationCriterion.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/iterative/TerminationCriterion.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/iterative/TerminationCriterion.scala
deleted file mode 100644
index b9f2264..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/iterative/TerminationCriterion.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.examples.scala.iterative
-//
-//import org.apache.flink.client.LocalExecutor
-//import org.apache.flink.api.common.Program
-//import org.apache.flink.api.common.ProgramDescription
-//
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//
-///**
-// * Example of using the bulk iteration with termination criterion with the
-// * scala api.
-// */
-//class TerminationCriterion extends Program with ProgramDescription with Serializable {
-//  override def getDescription() = {
-//    "Parameters: <maxNumberIterations> <output>"
-//  }
-//
-//  override def getPlan(args: String*) = {
-//    getScalaPlan(args(0).toInt, args(1))
-//  }
-//
-//  def getScalaPlan(maxNumberIterations: Int, resultOutput: String) = {
-//    val dataSource = CollectionDataSource[Double](List(1.0))
-//
-//    val halve = (partialSolution: DataSetOLD[Double]) => {
-//      partialSolution map { x => x /2 }
-//    }
-//
-//    val terminationCriterion = (prev: DataSetOLD[Double], cur: DataSetOLD[Double]) => {
-//      val diff = prev cross cur map { (valuePrev, valueCurrent) => math.abs(valuePrev - valueCurrent) }
-//      diff filter {
-//        difference => difference > 0.1
-//      }
-//    }
-//
-//    val iteration = dataSource.iterateWithTermination(maxNumberIterations, halve, terminationCriterion)
-//
-//
-//    val sink = iteration.write(resultOutput, CsvOutputFormat())
-//
-//    val plan = new ScalaPlan(Seq(sink))
-//    plan.setDefaultParallelism(1)
-//    plan
-//  }
-//}
-//
-//object RunTerminationCriterion {
-//  def main(args: Array[String]) {
-//    val tc = new TerminationCriterion
-//
-//    if(args.size < 2) {
-//      println(tc.getDescription())
-//      return
-//    }
-//    val plan = tc.getScalaPlan(args(0).toInt, args(1))
-//    LocalExecutor.execute(plan)
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
index 3e66275..95dcb9a 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala
@@ -28,58 +28,56 @@ import org.apache.flink.examples.java.ml.util.LinearRegressionData
 import scala.collection.JavaConverters._
 
 /**
- * This example implements a basic Linear Regression  to solve the y = theta0 + theta1*x problem using batch gradient descent algorithm.
+ * This example implements a basic Linear Regression  to solve the y = theta0 + theta1*x problem
+ * using batch gradient descent algorithm.
  *
- * <p>
- * Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering algorithm and works as follows:<br>
- * Giving a data set and target set, the BGD try to find out the best parameters for the data set to fit the target set.
- * In each iteration, the algorithm computes the gradient of the cost function and use it to update all the parameters.
- * The algorithm terminates after a fixed number of iterations (as in this implementation)
+ * Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering
+ * algorithm and works as follows:
+ *
+ * Giving a data set and target set, the BGD try to find out the best parameters for the data set
+ * to fit the target set.
+ * In each iteration, the algorithm computes the gradient of the cost function and use it to
+ * update all the parameters.
+ * The algorithm terminates after a fixed number of iterations (as in this implementation).
  * With enough iteration, the algorithm can minimize the cost function and find the best parameters
- * This is the Wikipedia entry for the <a href = "http://en.wikipedia.org/wiki/Linear_regression">Linear regression</a> and <a href = "http://en.wikipedia.org/wiki/Gradient_descent">Gradient descent algorithm</a>.
- * 
- * <p>
- * This implementation works on one-dimensional data. And find the two-dimensional theta.<br>
- * It find the best Theta parameter to fit the target.
- * 
- * <p>
+ * This is the Wikipedia entry for the
+ * [[http://en.wikipedia.org/wiki/Linear_regression Linear regression]] and
+ * [[http://en.wikipedia.org/wiki/Gradient_descent Gradient descent algorithm]].
+ *
+ * This implementation works on one-dimensional data and finds the best two-dimensional theta to
+ * fit the target.
+ *
  * Input files are plain text files and must be formatted as follows:
- * <ul>
- * <li>Data points are represented as two double values separated by a blank character. The first one represent the X(the training data) and the second represent the Y(target).
- * Data points are separated by newline characters.<br>
- * For example <code>"-0.02 -0.04\n5.3 10.6\n"</code> gives two data points (x=-0.02, y=-0.04) and (x=5.3, y=10.6).
- * </ul>
- * 
- * <p>
+ *
+ *  - Data points are represented as two double values separated by a blank character. The first
+ *    one represent the X(the training data) and the second represent the Y(target). Data points are
+ *    separated by newline characters.
+ *    For example `"-0.02 -0.04\n5.3 10.6\n"`gives two data points
+ *    (x=-0.02, y=-0.04) and (x=5.3, y=10.6).
+ *
  * This example shows how to use:
- * <ul>
- * <li> Bulk iterations
- * <li> Broadcast variables in bulk iterations
- * <li> Custom Java objects (PoJos)
- * </ul>
+ *
+ *  - Bulk iterations
+ *  - Broadcast variables in bulk iterations
  */
 object LinearRegression {
 
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
 	def main(args: Array[String]) {
 		if (!parseParameters(args)) {
 			return
 		}
 
 		val env = ExecutionEnvironment.getExecutionEnvironment
-		val data: DataSet[Data] = getDataSet(env)
-		val parameters: DataSet[Params] = getParamsDataSet(env)
+		val data = getDataSet(env)
+		val parameters = getParamsDataSet(env)
+
 		val result = parameters.iterate(numIterations) { currentParameters =>
 			val newParameters = data
 				.map(new SubUpdate).withBroadcastSet(currentParameters, "parameters")
-				.reduce { (val1, val2) =>
-				val new_theta0: Double = val1._1.getTheta0 + val2._1.getTheta0
-				val new_theta1: Double = val1._1.getTheta1 + val2._1.getTheta1
-				val result: Params = new Params(new_theta0, new_theta1)
-				(result, val1._2 + val2._2)
-			}
+				.reduce { (p1, p2) =>
+          val result = p1._1 + p2._1
+				  (result, p1._2 + p2._2)
+			  }
 				.map { x => x._1.div(x._2) }
 			newParameters
 		}
@@ -88,73 +86,28 @@ object LinearRegression {
 			result.writeAsText(outputPath)
 		}
 		else {
-			result.print
+			result.print()
 		}
 		env.execute("Scala Linear Regression example")
 	}
 
-	// *************************************************************************
-	//     DATA TYPES
-	// *************************************************************************
 	/**
 	 * A simple data sample, x means the input, and y means the target.
 	 */
-	class Data extends Serializable {
-
-		def this(x: Double, y: Double) {
-			this()
-			this.x = x
-			this.y = y
-		}
-
-		override def toString: String = {
-			"(" + x + "|" + y + ")"
-		}
-
-		var x: Double = .0
-		var y: Double = .0
-	}
+  case class Data(var x: Double, var y: Double)
 
 	/**
 	 * A set of parameters -- theta0, theta1.
 	 */
-	class Params extends Serializable {
+  case class Params(theta0: Double, theta1: Double) {
+    def div(a: Int): Params = {
+      Params(theta0 / a, theta1 / a)
+    }
 
-		def this(x0: Double, x1: Double) {
-			this()
-			this.theta0 = x0
-			this.theta1 = x1
-		}
-
-		override def toString: String = {
-			theta0 + " " + theta1
-		}
-
-		def getTheta0: Double = {
-			theta0
-		}
-
-		def getTheta1: Double = {
-			theta1
-		}
-
-		def setTheta0(theta0: Double) {
-			this.theta0 = theta0
-		}
-
-		def setTheta1(theta1: Double) {
-			this.theta1 = theta1
-		}
-
-		def div(a: Integer): Params = {
-			this.theta0 = theta0 / a
-			this.theta1 = theta1 / a
-			return this
-		}
-
-		private var theta0: Double = .0
-		private var theta1: Double = .0
-	}
+    def +(other: Params) = {
+      Params(theta0 + other.theta0, theta1 + other.theta1)
+    }
+  }
 
 	// *************************************************************************
 	//     USER FUNCTIONS
@@ -163,24 +116,22 @@ object LinearRegression {
 	/**
 	 * Compute a single BGD type update for every parameters.
 	 */
-	class SubUpdate extends RichMapFunction[Data, Tuple2[Params, Integer]] {
+	class SubUpdate extends RichMapFunction[Data, (Params, Int)] {
 
-		private var parameters: Traversable[Params] = null
-		var parameter: Params = null
-		private var count: Int = 1
+		private var parameter: Params = null
 
 		/** Reads the parameters from a broadcast variable into a collection. */
 		override def open(parameters: Configuration) {
-			this.parameters = getRuntimeContext.getBroadcastVariable[Params]("parameters").asScala
+			val parameters = getRuntimeContext.getBroadcastVariable[Params]("parameters").asScala
+      parameter = parameters.head
 		}
 
-		def map(in: Data): Tuple2[Params, Integer] = {
-			for (p <- parameters) {
-				this.parameter = p
-			}
-			val theta_0: Double = parameter.getTheta0 - 0.01 * ((parameter.getTheta0 + (parameter.getTheta1 * in.x)) - in.y)
-			val theta_1: Double = parameter.getTheta1 - 0.01 * (((parameter.getTheta0 + (parameter.getTheta1 * in.x)) - in.y) * in.x)
-			new Tuple2[Params, Integer](new Params(theta_0, theta_1), count)
+		def map(in: Data): (Params, Int) = {
+			val theta0 =
+        parameter.theta0 - 0.01 * ((parameter.theta0 + (parameter.theta1 * in.x)) - in.y)
+			val theta1 =
+        parameter.theta1 - 0.01 * (((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) * in.x)
+			(Params(theta0, theta1), 1)
 		}
 	}
 
@@ -198,7 +149,7 @@ object LinearRegression {
 			if (programArguments.length == 3) {
 				dataPath = programArguments(0)
 				outputPath = programArguments(1)
-				numIterations = Integer.parseInt(programArguments(2))
+				numIterations = programArguments(2).toInt
 			}
 			else {
 				System.err.println("Usage: LinearRegression <data path> <result path> <num iterations>")
@@ -206,11 +157,13 @@ object LinearRegression {
 			}
 		}
 		else {
-			System.out.println("Executing Linear Regression example with default parameters and built-in default data.")
-			System.out.println("  Provide parameters to read input data from files.")
-			System.out.println("  See the documentation for the correct format of input files.")
-			System.out.println("  We provide a data generator to create synthetic input files for this program.")
-			System.out.println("  Usage: LinearRegression <data path> <result path> <num iterations>")
+      System.out.println("Executing Linear Regression example with default parameters and " +
+        "built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("  We provide a data generator to create synthetic input files for this " +
+        "program.")
+      System.out.println("  Usage: LinearRegression <data path> <result path> <num iterations>")
 		}
 		true
 	}
@@ -225,7 +178,7 @@ object LinearRegression {
 		}
 		else {
 			val data = LinearRegressionData.DATA map {
-				case Array(x, y) => new Data(x.asInstanceOf[Double], y.asInstanceOf[Double])
+				case Array(x, y) => Data(x.asInstanceOf[Double], y.asInstanceOf[Double])
 			}
 			env.fromCollection(data)
 		}
@@ -233,7 +186,7 @@ object LinearRegression {
 
 	private def getParamsDataSet(env: ExecutionEnvironment): DataSet[Params] = {
 		val params = LinearRegressionData.PARAMS map {
-			case Array(x, y) => new Params(x.asInstanceOf[Double], y.asInstanceOf[Double])
+			case Array(x, y) => Params(x.asInstanceOf[Double], y.asInstanceOf[Double])
 		}
 		env.fromCollection(params)
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/RelationalQuery.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/RelationalQuery.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/RelationalQuery.scala
deleted file mode 100644
index 5e7d7f3..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/RelationalQuery.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.examples.scala.relational;
-//
-//import org.apache.flink.client.LocalExecutor
-//import org.apache.flink.api.common.Program
-//import org.apache.flink.api.common.ProgramDescription
-//
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//
-//
-///**
-// * The TPC-H is a decision support benchmark on relational data.
-// * Its documentation and the data generator (DBGEN) can be found
-// * on http://www.tpc.org/tpch/ .
-// *
-// * This Flink program implements a modified version of the query 3 of
-// * the TPC-H benchmark including one join, some filtering and an
-// * aggregation. The query resembles the following SQL statement:
-// * <pre>
-// * SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
-// *   FROM orders, lineitem
-// *   WHERE l_orderkey = o_orderkey
-// *     AND o_orderstatus = "X"
-// *     AND YEAR(o_orderdate) > Y
-// *     AND o_orderpriority LIKE "Z%"
-// *   GROUP BY l_orderkey, o_shippriority;
-// * </pre>
-// */
-//class RelationalQuery extends Program with ProgramDescription with Serializable {
-//
-//  case class Order(orderId: Int, status: Char, year: Int, orderPriority: String, shipPriority: Int)
-//  case class LineItem(orderId: Int, extendedPrice: Double)
-//  case class PrioritizedOrder(orderId: Int, shipPriority: Int, revenue: Double)
-//
-//
-//  def getScalaPlan(numSubTasks: Int, ordersInput: String, lineItemsInput: String, ordersOutput: String, status: Char = 'F', minYear: Int = 1993, priority: String = "5") = {
-//
-//    // ORDER intput: parse as CSV and select relevant fields
-//    val orders = DataSource(ordersInput, CsvInputFormat[(Int, String, String, String, String, String, String, Int)]("\n", '|'))
-//                         .map { t => Order(t._1, t._3.charAt(0), t._5.substring(0,4).toInt, t._6, t._8) }
-//
-//    // ORDER intput: parse as CSV and select relevant fields
-//    val lineItems = DataSource(lineItemsInput, CsvInputFormat[(Int, String, String, String, String, Double)]("\n", '|'))
-//                         .map { t => LineItem(t._1, t._6) }
-//
-//    // filter the orders input
-//    val filteredOrders = orders filter { o => o.status == status && o.year > minYear && o.orderPriority.startsWith(priority) }
-//
-//    // join the filteres result with the lineitem input
-//    val prioritizedItems = filteredOrders join lineItems where { _.orderId } isEqualTo { _.orderId } map { (o, li) => PrioritizedOrder(o.orderId, o.shipPriority, li.extendedPrice) }
-//
-//    // group by and sum the joined data
-//    val prioritizedOrders = prioritizedItems groupBy { pi => (pi.orderId, pi.shipPriority) } reduce { (po1, po2) => po1.copy(revenue = po1.revenue + po2.revenue) }
-//
-//    // write the result as csv
-//    val output = prioritizedOrders.write(ordersOutput, CsvOutputFormat("\n", "|"))
-//
-//    val plan = new ScalaPlan(Seq(output), "Relational Query")
-//    plan.setDefaultParallelism(numSubTasks)
-//    plan
-//  }
-//
-//  override def getDescription() = {
-//    "Parameters: <orders>, <lineitem>, <output>, <degree-of-parallelism>"
-//  }
-//  override def getPlan(args: String*) = {
-//    getScalaPlan(args(3).toInt, args(0), args(1), args(2))
-//  }
-//}
-//
-//
-///**
-// * Entry point to make the example standalone runnable with the local executor
-// */
-//object RunRelationalQuery {
-//
-//  def main(args: Array[String]) {
-//    val query = new RelationalQuery
-//
-//    if (args.size < 4) {
-//      println(query.getDescription)
-//      return
-//    }
-//    val plan = query.getScalaPlan(args(3).toInt, args(0), args(1), args(2))
-//    LocalExecutor.execute(plan)
-//  }
-//}
-//

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/testing/KMeansForTest.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/testing/KMeansForTest.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/testing/KMeansForTest.scala
deleted file mode 100644
index 52bfc15..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/testing/KMeansForTest.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.examples.scala.testing
-//
-//// Uncomment if you need to rebuild it for PackagedProgramEndToEndTest
-////
-////import org.apache.flink.api.common.Program
-////import org.apache.flink.api.common.ProgramDescription
-////
-////import org.apache.flink.api.scala._
-////import org.apache.flink.api.scala.operators._
-////
-////
-////class KMeansForTest extends Program with ProgramDescription {
-////
-////  override def getPlan(args: String*) = {
-////    getScalaPlan(args(0).toInt, args(1), args(2), args(3), args(4).toInt)
-////  }
-////
-////  case class Point(x: Double, y: Double, z: Double) {
-////    def computeEuclidianDistance(other: Point) = other match {
-////      case Point(x2, y2, z2) => math.sqrt(math.pow(x - x2, 2) + math.pow(y - y2, 2) + math.pow(z - z2, 2))
-////    }
-////  }
-////
-////  case class Distance(dataPoint: Point, clusterId: Int, distance: Double)
-////
-////  def asPointSum = (pid: Int, dist: Distance) => dist.clusterId -> PointSum(1, dist.dataPoint)
-////
-////  //  def sumPointSums = (dataPoints: Iterator[(Int, PointSum)]) => dataPoints.reduce { (z, v) => z.copy(_2 = z._2 + v._2) }
-////  def sumPointSums = (dataPoints: Iterator[(Int, PointSum)]) => {
-////    dataPoints.reduce { (z, v) => z.copy(_2 = z._2 + v._2) }
-////  }
-////
-////
-////  case class PointSum(count: Int, pointSum: Point) {
-////    def +(that: PointSum) = that match {
-////      case PointSum(c, Point(x, y, z)) => PointSum(count + c, Point(x + pointSum.x, y + pointSum.y, z + pointSum.z))
-////    }
-////
-////    def toPoint() = Point(round(pointSum.x / count), round(pointSum.y / count), round(pointSum.z / count))
-////
-////    // Rounding ensures that we get the same results in a multi-iteration run
-////    // as we do in successive single-iteration runs, since the output format
-////    // only contains two decimal places.
-////    private def round(d: Double) = math.round(d * 100.0) / 100.0;
-////  }
-////
-////  def parseInput = (line: String) => {
-////    val PointInputPattern = """(\d+)\|-?(\d+\.\d+)\|-?(\d+\.\d+)\|-?(\d+\.\d+)\|""".r
-////    val PointInputPattern(id, x, y, z) = line
-////    (id.toInt, Point(x.toDouble, y.toDouble, z.toDouble))
-////  }
-////
-////  def formatOutput = (cid: Int, p: Point) => "%d|%.2f|%.2f|%.2f|".format(cid, p.x, p.y, p.z)
-////
-////  def computeDistance(p: (Int, Point), c: (Int, Point)) = {
-////    val ((pid, dataPoint), (cid, clusterPoint)) = (p, c)
-////    val distToCluster = dataPoint.computeEuclidianDistance(clusterPoint)
-////
-////    pid -> Distance(dataPoint, cid, distToCluster)
-////  }
-////
-////
-////  def getScalaPlan(numSubTasks: Int, dataPointInput: String, clusterInput: String, clusterOutput: String, numIterations: Int) = {
-////    val dataPoints = DataSource(dataPointInput, DelimitedInputFormat(parseInput))
-////    val clusterPoints = DataSource(clusterInput, DelimitedInputFormat(parseInput))
-////
-////    val finalCenters = clusterPoints.iterate(numIterations, { centers =>
-////
-////      val distances = dataPoints cross centers map computeDistance
-////      val nearestCenters = distances groupBy { case (pid, _) => pid } reduceGroup { ds => ds.minBy(_._2.distance) } map asPointSum.tupled
-////      val newCenters = nearestCenters groupBy { case (cid, _) => cid } reduceGroup sumPointSums map { case (cid, pSum) => cid -> pSum.toPoint() }
-////
-////      newCenters
-////    })
-////
-////    val output = finalCenters.write(clusterOutput, DelimitedOutputFormat(formatOutput.tupled))
-////
-////    val plan = new ScalaPlan(Seq(output), "KMeans Iteration (ONLY FOR TESTING)")
-////    plan.setDefaultParallelism(numSubTasks)
-////    plan
-////  }
-////
-////  override def getDescription() = {
-////    "Parameters: [numSubStasksS] [dataPoints] [clusterCenters] [output] [numIterations]"
-////  }
-////}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithCount.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithCount.scala
deleted file mode 100644
index f71b18d..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithCount.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.examples.scala.wordcount
-//
-//import org.apache.flink.client.LocalExecutor
-//import org.apache.flink.api.common.Program
-//import org.apache.flink.api.common.ProgramDescription
-//
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//
-//
-///**
-// * Implementation of word count in Scala. This example uses the built in count function for tuples.
-// */
-//class WordCountWithCount extends WordCount {
-//
-//  override def getScalaPlan(numSubTasks: Int, textInput: String, wordsOutput: String) = {
-//    val input = TextFile(textInput)
-//
-//    val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } }
-//    val counts = words groupBy { x => x } count()
-//
-//    val output = counts.write(wordsOutput, CsvOutputFormat("\n", " "))
-//
-//    val plan = new ScalaPlan(Seq(output), "Word Count")
-//    plan.setDefaultParallelism(numSubTasks)
-//    plan
-//  }
-//}
-//
-//
-///**
-// * Entry point to make the example standalone runnable with the local executor.
-// */
-//object RunWordCountWithCount {
-//  def main(args: Array[String]) {
-//    val wc = new WordCountWithCount
-//    if (args.size < 3) {
-//      println(wc.getDescription)
-//      return
-//    }
-//    val plan = wc.getScalaPlan(args(0).toInt, args(1), args(2))
-//    LocalExecutor.execute(plan)
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithUserDefinedType.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithUserDefinedType.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithUserDefinedType.scala
deleted file mode 100644
index 2ee0c43..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCountWithUserDefinedType.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.examples.scala.wordcount
-//
-//import org.apache.flink.client.LocalExecutor
-//import org.apache.flink.api.common.Program
-//import org.apache.flink.api.common.ProgramDescription
-//
-//import org.apache.flink.types.IntValue
-//import org.apache.flink.types.StringValue
-//
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//
-//
-///**
-// * Implementation of word count in Scala, using a user defined type rather than one of the
-// * built-in supported types like primitives, tuples, or other (nested) case classes.
-// */
-//class WordCountWithUserDefinedType extends Program with Serializable {
-//
-//  def getScalaPlan(numSubTasks: Int, textInput: String, wordsOutput: String) = {
-//    val input = TextFile(textInput)
-//
-//    val words = input flatMap { _.toLowerCase().split("""\W+""") filter { _ != "" } map { w => (new StringValue(w), new IntValue(1)) } }
-//
-//    val counts = words
-//      .groupBy { case (word, _) => word }
-//      .reduce { (w1, w2) => (w1._1, new IntValue(w1._2.getValue + w2._2.getValue)) }
-//
-//    val output = counts.write(wordsOutput, CsvOutputFormat("\n", " "))
-//
-//    val plan = new ScalaPlan(Seq(output), "Word Count (immutable)")
-//    plan.setDefaultParallelism(numSubTasks)
-//    plan
-//  }
-//
-//
-//  override def getPlan(args: String*) = {
-//    getScalaPlan(args(0).toInt, args(1), args(2))
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index 45db390..dfd9419 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -39,23 +39,23 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java</artifactId>
 			<version>${project.version}</version>
@@ -108,7 +108,7 @@ under the License.
 				<version>3.1.4</version>
 				<executions>
 					<!-- Run scala compiler in the process-resources phase, so that dependencies on
-					    scala classes can be resolved later in the (Java) compile phase -->
+						scala classes can be resolved later in the (Java) compile phase -->
 					<execution>
 						<id>scala-compile-first</id>
 						<phase>process-resources</phase>
@@ -118,7 +118,7 @@ under the License.
 					</execution>
  
 					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
-					     scala classes can be resolved later in the (Java) test-compile phase -->
+						 scala classes can be resolved later in the (Java) test-compile phase -->
 					<execution>
 						<id>scala-test-compile</id>
 						<phase>process-test-resources</phase>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index d04f968..e973093 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -635,14 +635,12 @@ class DataSet[T: ClassTag](private[flink] val set: JavaDataSet[T]) {
    *   val left: DataSet[(String, Int, Int)] = ...
    *   val right: DataSet[(Int, String, Int)] = ...
    *   val joined = left.join(right).where(0).isEqualTo(1) { (l, r) =>
-   *     if (l._2 > 4) {
-   *       Some((l._2, r._3))
-   *     } else {
-   *       None
-   *     }
+   *     (l._1, r._2)
    *   }
    * }}}
-   * This can be used to implement a filter directly in the join or to output more than one values:
+   * A join function with a [[Collector]] can be used to implement a filter directly in the join
+   * or to output more than one values. This type of join function does not return a value, instead
+   * values are emitted using the collector:
    * {{{
    *   val left: DataSet[(String, Int, Int)] = ...
    *   val right: DataSet[(Int, String, Int)] = ...
@@ -696,11 +694,12 @@ class DataSet[T: ClassTag](private[flink] val set: JavaDataSet[T]) {
    *   val right: DataSet[(Int, String, Int)] = ...
    *   val coGrouped = left.coGroup(right).where(0).isEqualTo(1) { (l, r) =>
    *     // l and r are of type TraversableOnce
-   *     Some((l.min, r.max))
+   *     (l.min, r.max)
    *   }
    * }}}
-   * This can be used to implement a filter directly in the coGroup or to output more than one
-   * values:
+   * A coGroup function with a [[Collector]] can be used to implement a filter directly in the
+   * coGroup or to output more than one values. This type of coGroup function does not return a
+   * value, instead values are emitted using the collector
    * {{{
    *   val left: DataSet[(String, Int, Int)] = ...
    *   val right: DataSet[(Int, String, Int)] = ...

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
index a7ca821..802fd09 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala
@@ -144,11 +144,9 @@ trait GroupedDataSet[T] {
 }
 
 /**
- * /**
  * Private implementation for [[GroupedDataSet]] to keep the implementation details, i.e. the
  * parameters of the constructor, hidden.
  */
- */
 private[flink] class GroupedDataSetImpl[T: ClassTag](
     private val set: JavaDataSet[T],
     private val keys: Keys[T])
@@ -256,7 +254,7 @@ private[flink] class GroupedDataSetImpl[T: ClassTag](
   }
 
   def reduceGroup[R: TypeInformation: ClassTag](
-                                                 fun: (TraversableOnce[T]) => R): DataSet[R] = {
+      fun: (TraversableOnce[T]) => R): DataSet[R] = {
     Validate.notNull(fun, "Group reduce function must not be null.")
     val reducer = new GroupReduceFunction[T, R] {
       def reduce(in: java.lang.Iterable[T], out: Collector[R]) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ComputeEdgeDegreesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ComputeEdgeDegreesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ComputeEdgeDegreesITCase.java
deleted file mode 100644
index 2426bc9..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ComputeEdgeDegreesITCase.java
+++ /dev/null
@@ -1,39 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.test.exampleScalaPrograms;
-//
-//import org.apache.flink.api.common.Plan;
-//import org.apache.flink.configuration.Configuration;
-//import org.apache.flink.examples.scala.graph.ComputeEdgeDegrees;
-//
-//public class ComputeEdgeDegreesITCase extends org.apache.flink.test.recordJobTests.ComputeEdgeDegreesITCase {
-//
-//	public ComputeEdgeDegreesITCase(Configuration config) {
-//		super(config);
-//	}
-//
-//	@Override
-//	protected Plan getTestJob() {
-//		ComputeEdgeDegrees computeDegrees = new ComputeEdgeDegrees();
-//		return computeDegrees.getScalaPlan(
-//				config.getInteger("ComputeEdgeDegreesTest#NumSubtasks", DOP),
-//				edgesPath, resultPath);
-//	}
-//}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java
index 69a5c9a..71a7e23 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/ConnectedComponentsITCase.java
@@ -1,34 +1,60 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.test.exampleScalaPrograms;
-//
-//import org.apache.flink.api.common.Plan;
-//import org.apache.flink.examples.scala.graph.ConnectedComponents;
-//
-//public class ConnectedComponentsITCase extends org.apache.flink.test.iterative.ConnectedComponentsITCase {
-//
-//	@Override
-//	protected Plan getTestJob() {
-//		ConnectedComponents cc = new ConnectedComponents();
-//		Plan plan = cc.getScalaPlan(verticesPath, edgesPath, resultPath, 100);
-//		plan.setDefaultParallelism(DOP);
-//		return plan;
-//	}
-//}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.exampleScalaPrograms;
+
+import org.apache.flink.examples.scala.graph.ConnectedComponents;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.io.BufferedReader;
+
+public class ConnectedComponentsITCase extends JavaProgramTestBase {
+	
+	private static final long SEED = 0xBADC0FFEEBEEFL;
+	
+	private static final int NUM_VERTICES = 1000;
+	
+	private static final int NUM_EDGES = 10000;
+
+	
+	private String verticesPath;
+	private String edgesPath;
+	private String resultPath;
+	
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
+		edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
+		resultPath = getTempFilePath("results");
+	}
+	
+	@Override
+	protected void testProgram() throws Exception {
+		ConnectedComponents.main(new String[] {verticesPath, edgesPath, resultPath, "100"});
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		for (BufferedReader reader : getResultReader(resultPath)) {
+			ConnectedComponentsData.checkOddEvenResult(reader);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleBasicITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleBasicITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleBasicITCase.java
new file mode 100644
index 0000000..5c19876
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleBasicITCase.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.exampleScalaPrograms;
+
+import org.apache.flink.examples.scala.graph.EnumTrianglesBasic;
+import org.apache.flink.test.testdata.EnumTriangleData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class EnumTriangleBasicITCase extends JavaProgramTestBase {
+	
+	protected String edgePath;
+	protected String resultPath;
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		edgePath = createTempFile("edges", EnumTriangleData.EDGES);
+		resultPath = getTempDirPath("triangles");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EnumTriangleData.TRIANGLES_BY_ID, resultPath);
+	}
+	
+	@Override
+	protected void testProgram() throws Exception {
+		EnumTrianglesBasic.main(new String[] { edgePath, resultPath });
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
new file mode 100644
index 0000000..944aaf4
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTriangleOptITCase.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.exampleScalaPrograms;
+
+import org.apache.flink.examples.scala.graph.EnumTrianglesOpt;
+import org.apache.flink.test.testdata.EnumTriangleData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class EnumTriangleOptITCase extends JavaProgramTestBase {
+	
+	protected String edgePath;
+	protected String resultPath;
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		edgePath = createTempFile("edges", EnumTriangleData.EDGES);
+		resultPath = getTempDirPath("triangles");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(EnumTriangleData.TRIANGLES_BY_DEGREE, resultPath);
+	}
+	
+	@Override
+	protected void testProgram() throws Exception {
+		EnumTrianglesOpt.main(new String[] { edgePath, resultPath });
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTrianglesOnEdgesWithDegreesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTrianglesOnEdgesWithDegreesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTrianglesOnEdgesWithDegreesITCase.java
deleted file mode 100644
index ab8c563..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/EnumTrianglesOnEdgesWithDegreesITCase.java
+++ /dev/null
@@ -1,39 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.test.exampleScalaPrograms;
-//
-//import org.apache.flink.api.common.Plan;
-//import org.apache.flink.configuration.Configuration;
-//import org.apache.flink.examples.scala.graph.EnumTrianglesOnEdgesWithDegrees;
-//
-//public class EnumTrianglesOnEdgesWithDegreesITCase extends org.apache.flink.test.recordJobTests.EnumTrianglesOnEdgesWithDegreesITCase {
-//
-//	public EnumTrianglesOnEdgesWithDegreesITCase(Configuration config) {
-//		super(config);
-//	}
-//
-//	@Override
-//	protected Plan getTestJob() {
-//		EnumTrianglesOnEdgesWithDegrees enumTriangles = new EnumTrianglesOnEdgesWithDegrees();
-//		return enumTriangles.getScalaPlan(
-//				config.getInteger("EnumTrianglesTest#NumSubtasks", DOP),
-//				edgesPath, resultPath);
-//	}
-//}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
new file mode 100644
index 0000000..2369a4b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/PageRankITCase.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.exampleScalaPrograms;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.scala.graph.PageRankBasic;
+import org.apache.flink.test.testdata.PageRankData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class PageRankITCase extends JavaProgramTestBase {
+	
+	private static int NUM_PROGRAMS = 2;
+	
+	private int curProgId = config.getInteger("ProgramId", -1);
+	
+	private String verticesPath;
+	private String edgesPath;
+	private String resultPath;
+	private String expectedResult;
+	
+	public PageRankITCase(Configuration config) {
+		super(config);
+	}
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+		verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
+		edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = runProgram(curProgId);
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01);
+	}
+	
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+		
+		return toParameterList(tConfigs);
+	}
+	
+
+	public String runProgram(int progId) throws Exception {
+		
+		switch(progId) {
+		case 1: {
+			PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "3"});
+			return PageRankData.RANKS_AFTER_3_ITERATIONS;
+		}
+		case 2: {
+			// start with a very high number of iteration such that the dynamic convergence criterion must handle termination
+			PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
+			return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
+		}
+		
+		default: 
+			throw new IllegalArgumentException("Invalid program id");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/RelationalQueryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/RelationalQueryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/RelationalQueryITCase.java
deleted file mode 100644
index ca5a707..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/RelationalQueryITCase.java
+++ /dev/null
@@ -1,49 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.test.exampleScalaPrograms;
-//
-//import org.apache.flink.api.common.Plan;
-//import org.apache.flink.configuration.Configuration;
-//import org.apache.flink.examples.scala.relational.RelationalQuery;
-//import org.junit.runner.RunWith;
-//import org.junit.runners.Parameterized;
-//
-//import java.util.Locale;
-//
-//@RunWith(Parameterized.class)
-//public class RelationalQueryITCase extends org.apache.flink.test.recordJobTests.TPCHQuery3ITCase {
-//
-//	public RelationalQueryITCase(Configuration config) {
-//		super(config);
-//		Locale.setDefault(Locale.US);
-//	}
-//
-//	@Override
-//	protected Plan getTestJob()  {
-//
-//		RelationalQuery tpch3 = new RelationalQuery();
-//		return tpch3.getScalaPlan(
-//				config.getInteger("dop", 1),
-//				ordersPath,
-//				lineitemsPath,
-//				resultPath,
-//				'F', 1993, "5");
-//	}
-//}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java
deleted file mode 100644
index 4d0eb24..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureNaiveITCase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.test.exampleScalaPrograms;
-//
-//import org.apache.flink.api.common.Plan;
-//import org.apache.flink.examples.scala.graph.TransitiveClosureNaive;
-//import org.apache.flink.test.util.RecordAPITestBase;
-//
-//public class TransitiveClosureNaiveITCase extends RecordAPITestBase {
-//
-//	protected String verticesPath = null;
-//	protected String edgesPath = null;
-//	protected String resultPath = null;
-//
-//	private static final String VERTICES = "0\n1\n2";
-//	private static final String EDGES = "0|1\n1|2";
-//	private static final String EXPECTED = "0|0|0\n0|1|1\n0|2|2\n1|1|0\n1|2|1\n2|2|0";
-//
-//	@Override
-//	protected void preSubmit() throws Exception {
-//		verticesPath = createTempFile("vertices.txt", VERTICES);
-//		edgesPath = createTempFile("edges.txt", EDGES);
-//		resultPath = getTempDirPath("transitiveClosure");
-//	}
-//
-//	@Override
-//	protected Plan getTestJob() {
-//		TransitiveClosureNaive transitiveClosureNaive = new TransitiveClosureNaive();
-//		// "2" is the number of iterations here
-//		return transitiveClosureNaive.getScalaPlan(DOP, 2, verticesPath, edgesPath, resultPath);
-//	}
-//
-//	@Override
-//	protected void postSubmit() throws Exception {
-//		compareResultsByLinesInMemory(EXPECTED, resultPath);
-//	}
-//}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
index 63c598c..2d8ad31 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WebLogAnalysisITCase.java
@@ -1,32 +1,49 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.test.exampleScalaPrograms;
-//
-//import org.apache.flink.api.common.Plan;
-//import org.apache.flink.examples.scala.relational.WebLogAnalysis;
-//
-//public class WebLogAnalysisITCase extends org.apache.flink.test.recordJobTests.WebLogAnalysisITCase {
-//
-//	@Override
-//	protected Plan getTestJob() {
-//		WebLogAnalysis webLogAnalysis = new WebLogAnalysis();
-//		return webLogAnalysis.getScalaPlan(DOP, docsPath, ranksPath, visitsPath, resultPath);
-//	}
-//}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.exampleScalaPrograms;
+
+
+import org.apache.flink.examples.scala.relational.WebLogAnalysis;
+import org.apache.flink.test.testdata.WebLogAnalysisData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class WebLogAnalysisITCase extends JavaProgramTestBase {
+
+	private String docsPath;
+	private String ranksPath;
+	private String visitsPath;
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		docsPath = createTempFile("docs", WebLogAnalysisData.DOCS);
+		ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS);
+		visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, resultPath);
+	}
+	@Override
+	protected void testProgram() throws Exception {
+		WebLogAnalysis.main(new String[]{docsPath, ranksPath, visitsPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountPactValueITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountPactValueITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountPactValueITCase.java
deleted file mode 100644
index 94ec224..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountPactValueITCase.java
+++ /dev/null
@@ -1,33 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.test.exampleScalaPrograms;
-//
-//import org.apache.flink.api.common.Plan;
-//import org.apache.flink.examples.scala.wordcount.WordCountWithUserDefinedType;
-//
-//
-//public class WordCountPactValueITCase extends org.apache.flink.test.recordJobTests.WordCountITCase {
-//
-//	@Override
-//	protected Plan getTestJob() {
-//		WordCountWithUserDefinedType wc = new WordCountWithUserDefinedType();
-//		return wc.getScalaPlan(DOP, textPath, resultPath);
-//	}
-//}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/31ed0c4c/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java
deleted file mode 100644
index 5f53f72..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountWithCountFunctionITCase.java
+++ /dev/null
@@ -1,31 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.test.exampleScalaPrograms;
-//
-//import org.apache.flink.api.common.Plan;
-//import org.apache.flink.examples.scala.wordcount.WordCountWithCount;
-//
-//public class WordCountWithCountFunctionITCase extends org.apache.flink.test.recordJobTests.WordCountITCase {
-//
-//	@Override
-//	protected Plan getTestJob() {
-//		return new WordCountWithCount().getScalaPlan(DOP, textPath, resultPath);
-//	}
-//}