You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/03/29 13:50:40 UTC

[11/12] flink git commit: [FLINK-947] Add parser to Expression API for exposing it to Java

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java
new file mode 100644
index 0000000..89ec2e5
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.expressions.test;
+
+import org.apache.flink.api.expressions.ExpressionException;
+import org.apache.flink.api.expressions.ExpressionOperation;
+import org.apache.flink.api.expressions.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.expressions.ExpressionUtil;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class SelectITCase extends MultipleProgramsTestBase {
+
+
+	public SelectITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	private String resultPath;
+	private String expected = "";
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+
+	@Test
+	public void testSimpleSelectAllWithAs() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a,b,c");
+
+		ExpressionOperation<JavaBatchTranslator> result = in
+				.select("a, b, c");
+
+		DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class);
+		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+
+		expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+				"how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+				"Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+				"Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+				"Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+				"6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n";
+
+	}
+
+	@Test
+	public void testSimpleSelectWithNaming() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds);
+
+		ExpressionOperation<JavaBatchTranslator> result = in
+				.select("f0 as a, f1 as b")
+				.select("a, b");
+
+		DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class);
+		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+
+		expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testAsWithToFewFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b");
+
+		DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class);
+		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+
+		expected = " sorry dude ";
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testAsWithToManyFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b, c, d");
+
+		DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class);
+		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+
+		expected = " sorry dude ";
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testAsWithAmbiguousFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b, c, b");
+
+		DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class);
+		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+
+		expected = " today's not your day ";
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testOnlyFieldRefInAs() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b as c, d");
+
+		DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class);
+		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+
+		expected = "sorry bro";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java
new file mode 100644
index 0000000..f9f1c6b
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.expressions.test;
+
+import org.apache.flink.api.expressions.ExpressionException;
+import org.apache.flink.api.expressions.ExpressionOperation;
+import org.apache.flink.api.expressions.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.expressions.ExpressionUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class StringExpressionsITCase extends MultipleProgramsTestBase {
+
+
+	public StringExpressionsITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	private String resultPath;
+	private String expected = "";
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+
+	@Test
+	public void testSubstring() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+				new Tuple2<String, Integer>("AAAA", 2),
+				new Tuple2<String, Integer>("BBBB", 1));
+
+		ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b");
+
+		ExpressionOperation<JavaBatchTranslator> result = in
+				.select("a.substring(0, b)");
+
+		DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class);
+		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+
+		expected = "AA\nB";
+	}
+
+	@Test
+	public void testSubstringWithMaxEnd() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+				new Tuple2<String, Integer>("ABCD", 2),
+				new Tuple2<String, Integer>("ABCD", 1));
+
+		ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b");
+
+		ExpressionOperation<JavaBatchTranslator> result = in
+				.select("a.substring(b)");
+
+		DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class);
+		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+
+		expected = "CD\nBCD";
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testNonWorkingSubstring1() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple2<String, Float>> ds = env.fromElements(
+				new Tuple2<String, Float>("ABCD", 2.0f),
+				new Tuple2<String, Float>("ABCD", 1.0f));
+
+		ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b");
+
+		ExpressionOperation<JavaBatchTranslator> result = in
+				.select("a.substring(0, b)");
+
+		DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class);
+		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+
+		expected = "";
+	}
+
+	@Test(expected = ExpressionException.class)
+	public void testNonWorkingSubstring2() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple2<String, String>> ds = env.fromElements(
+				new Tuple2<String, String>("ABCD", "a"),
+				new Tuple2<String, String>("ABCD", "b"));
+
+		ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b");
+
+		ExpressionOperation<JavaBatchTranslator> result = in
+				.select("a.substring(b, 15)");
+
+		DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class);
+		resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		env.execute();
+
+		expected = "";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java
new file mode 100644
index 0000000..b75a2ee
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java
@@ -0,0 +1,100 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.api.scala.expressions.test;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.scala.PageRankExpression;
+import org.apache.flink.test.testdata.PageRankData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class PageRankExpressionITCase extends JavaProgramTestBase {
+
+	private static int NUM_PROGRAMS = 2;
+
+	private int curProgId = config.getInteger("ProgramId", -1);
+
+	private String verticesPath;
+	private String edgesPath;
+	private String resultPath;
+	private String expectedResult;
+
+	public PageRankExpressionITCase(Configuration config) {
+		super(config);
+	}
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+		verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
+		edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = runProgram(curProgId);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01);
+	}
+
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+
+		return toParameterList(tConfigs);
+	}
+
+
+	public String runProgram(int progId) throws Exception {
+
+		switch(progId) {
+		case 1: {
+			PageRankExpression.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData
+					.NUM_VERTICES + "", "3"});
+			return PageRankData.RANKS_AFTER_3_ITERATIONS;
+		}
+		case 2: {
+			// start with a very high number of iteration such that the dynamic convergence criterion must handle termination
+			PageRankExpression.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
+			return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
+		}
+
+		default:
+			throw new IllegalArgumentException("Invalid program id");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala
deleted file mode 100644
index fee7ac8..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testAggregationTypes: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
-      .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "231,1,21,21,11"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAggregationOnNonExistingField: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
-      .select('foo.avg)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
-  }
-
-  @Test
-  def testWorkingAggregationDataTypes: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toExpression
-      .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,1,1,1.5,1.5,2"
-  }
-
-  @Test
-  def testAggregationWithArithmetic: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toExpression
-      .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT")
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "5.5,2 THE COUNT"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testNonWorkingAggregationDataTypes: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("Hello", 1)).toExpression
-      .select('_1.sum)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testNoNestedAggregations: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("Hello", 1)).toExpression
-      .select('_2.sum.sum)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala
deleted file mode 100644
index 8921e89..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testAs: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithToFewFields: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithToManyFields: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithAmbiguousFields: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithNonFieldReference1: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    // as can only have field references
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithNonFieldReference2: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    // as can only have field references
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala
deleted file mode 100644
index b3f8ef3..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testAutoCastToString: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression
-      .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d")
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1b,1s,1i,1L,1.0f,1.0d"
-  }
-
-  @Test
-  def testNumericAutoCastInArithmetic: Unit = {
-
-    // don't test everything, just some common cast directions
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression
-      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "2,2,2,2.0,2.0,2.0"
-  }
-
-  @Test
-  def testNumericAutoCastInComparison: Unit = {
-
-    // don't test everything, just some common cast directions
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
-      .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d  && 'f > 1)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "2,2,2,2,2.0,2.0"
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala
deleted file mode 100644
index de41f65..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testArithmetic: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((5, 10)).as('a, 'b)
-      .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "0,10,2,10,1,-5"
-  }
-
-  @Test
-  def testLogic: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((5, true)).as('a, 'b)
-      .select('b && true, 'b && false, 'b || false, !'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "true,false,true,false"
-  }
-
-  @Test
-  def testComparisons: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
-      .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "true,true,false,false,true"
-  }
-
-  @Test
-  def testBitwiseOperations: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b)
-      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,7,6,-4"
-  }
-
-  @Test
-  def testBitwiseWithAutocast: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
-      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,7,6,-4"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testBitwiseWithNonWorkingAutocast: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val ds = env.fromElements((3.0, 5)).as('a, 'b)
-      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,7,6,-4"
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala
deleted file mode 100644
index 4b46458..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.tree.Literal
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit._
-
-
-@RunWith(classOf[Parameterized])
-class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = null
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testAllRejectingFilter: Unit = {
-    /*
-     * Test all-rejecting filter.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(false) )
-
-    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "\n"
-  }
-
-  @Test
-  def testAllPassingFilter: Unit = {
-    /*
-     * Test all-passing filter.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(true) )
-
-    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-  }
-
-  @Test
-  def testFilterOnStringTupleField: Unit = {
-    /*
-     * Test filter on String tuple field.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val filterDs = ds.filter( _._3.contains("world") )
-    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField: Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 === 0 )
-
-    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-      "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
-  }
-
-  // These two not yet done, but are planned
-
-  @Ignore
-  @Test
-  def testFilterBasicType: Unit = {
-    /*
-     * Test filter on basic type
-     */
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.getStringDataSet(env)
-
-    val filterDs = ds.filter( _.startsWith("H") )
-
-    filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
-  }
-
-  @Ignore
-  @Test
-  def testFilterOnCustomType: Unit = {
-    /*
-     * Test filter on custom type
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.getCustomTypeDataSet(env)
-    val filterDs = ds.filter( _.myString.contains("a") )
-    filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala
deleted file mode 100644
index 06f61db..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testGroupingOnNonExistentField: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .groupBy('_foo)
-      .select('a.avg)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
-  }
-
-  @Test
-  def testGroupedAggregate: Unit = {
-
-    // the grouping key needs to be forwarded to the intermediate DataSet, even
-    // if we don't want the key in the output
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
-  }
-
-  @Test
-  def testGroupingKeyForwardIfNotUsed: Unit = {
-
-    // the grouping key needs to be forwarded to the intermediate DataSet, even
-    // if we don't want the key in the output
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
-  }
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala
deleted file mode 100644
index d52acf6..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testJoin: Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g)
-
-    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
-  }
-
-  @Test
-  def testJoinWithFilter: Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
-
-    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "Hi,Hallo\n"
-  }
-
-  @Test
-  def testJoinWithMultipleKeys: Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
-
-    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-      "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testJoinNonExistingKey: Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g)
-
-    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testJoinWithNonMatchingKeyTypes: Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g)
-
-    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
-  }
-
-  @Test
-  def testJoinWithAggregation: Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('a === 'd).select('g.count)
-
-    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "6"
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java
deleted file mode 100644
index aefc8cf..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.api.scala.expressions;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.scala.PageRankExpression;
-import org.apache.flink.test.testdata.PageRankData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
-@RunWith(Parameterized.class)
-public class PageRankExpressionITCase extends JavaProgramTestBase {
-
-	private static int NUM_PROGRAMS = 2;
-
-	private int curProgId = config.getInteger("ProgramId", -1);
-
-	private String verticesPath;
-	private String edgesPath;
-	private String resultPath;
-	private String expectedResult;
-
-	public PageRankExpressionITCase(Configuration config) {
-		super(config);
-	}
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-		verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
-		edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = runProgram(curProgId);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01);
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-
-		return toParameterList(tConfigs);
-	}
-
-
-	public String runProgram(int progId) throws Exception {
-
-		switch(progId) {
-		case 1: {
-			PageRankExpression.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData
-					.NUM_VERTICES + "", "3"});
-			return PageRankData.RANKS_AFTER_3_ITERATIONS;
-		}
-		case 2: {
-			// start with a very high number of iteration such that the dynamic convergence criterion must handle termination
-			PageRankExpression.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
-			return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
-		}
-
-		default:
-			throw new IllegalArgumentException("Invalid program id");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala
deleted file mode 100644
index b286421..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testSimpleSelectAll: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression.select('_1, '_2, '_3)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-  }
-
-  @Test
-  def testSimpleSelectWithNaming: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
-      .select('_1 as 'a, '_2 as 'b)
-      .select('a, 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithToFewFields: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithToManyFields: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithAmbiguousFields: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c as 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala
deleted file mode 100644
index 3a7ad02..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testSubstring: Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
-      .select('a.substring(0, 'b))
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "AA\nB"
-  }
-
-  @Test
-  def testSubstringWithMaxEnd: Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
-      .select('a.substring('b))
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "CD\nBCD"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testNonWorkingSubstring1: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
-      .select('a.substring(0, 'b))
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "AAA\nBB"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testNonWorkingSubstring2: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
-      .select('a.substring('b, 15))
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "AAA\nBB"
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala
new file mode 100644
index 0000000..4a358bc
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testAggregationTypes: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
+      .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "231,1,21,21,11"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAggregationOnNonExistingField: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
+      .select('foo.avg)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = ""
+  }
+
+  @Test
+  def testWorkingAggregationDataTypes: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toExpression
+      .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,1,1,1.5,1.5,2"
+  }
+
+  @Test
+  def testAggregationWithArithmetic: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toExpression
+      .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT")
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "5.5,2 THE COUNT"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testNonWorkingAggregationDataTypes: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("Hello", 1)).toExpression
+      .select('_1.sum)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = ""
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testNoNestedAggregations: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("Hello", 1)).toExpression
+      .select('_2.sum.sum)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = ""
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala
new file mode 100644
index 0000000..18d7b09
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testAs: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithToFewFields: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithToManyFields: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithAmbiguousFields: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithNonFieldReference1: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    // as can only have field references
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithNonFieldReference2: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    // as can only have field references
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala
new file mode 100644
index 0000000..599ef6b
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testAutoCastToString: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression
+      .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d")
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1b,1s,1i,1L,1.0f,1.0d"
+  }
+
+  @Test
+  def testNumericAutoCastInArithmetic: Unit = {
+
+    // don't test everything, just some common cast directions
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression
+      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "2,2,2,2.0,2.0,2.0"
+  }
+
+  @Test
+  def testNumericAutoCastInComparison: Unit = {
+
+    // don't test everything, just some common cast directions
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
+      .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d  && 'f > 1)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "2,2,2,2,2.0,2.0"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala
new file mode 100644
index 0000000..9d37f70
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testArithmetic: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((5, 10)).as('a, 'b)
+      .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "0,10,2,10,1,-5"
+  }
+
+  @Test
+  def testLogic: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((5, true)).as('a, 'b)
+      .select('b && true, 'b && false, 'b || false, !'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "true,false,true,false"
+  }
+
+  @Test
+  def testComparisons: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
+      .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "true,true,false,false,true"
+  }
+
+  @Test
+  def testBitwiseOperations: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b)
+      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,7,6,-4"
+  }
+
+  @Test
+  def testBitwiseWithAutocast: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
+      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,7,6,-4"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testBitwiseWithNonWorkingAutocast: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds = env.fromElements((3.0, 5)).as('a, 'b)
+      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,7,6,-4"
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala
new file mode 100644
index 0000000..2841534
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.tree.Literal
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+
+@RunWith(classOf[Parameterized])
+class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = null
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testAllRejectingFilter: Unit = {
+    /*
+     * Test all-rejecting filter.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(false) )
+
+    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "\n"
+  }
+
+  @Test
+  def testAllPassingFilter: Unit = {
+    /*
+     * Test all-passing filter.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(true) )
+
+    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+  }
+
+  @Test
+  def testFilterOnStringTupleField: Unit = {
+    /*
+     * Test filter on String tuple field.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val filterDs = ds.filter( _._3.contains("world") )
+    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+  }
+
+  @Test
+  def testFilterOnIntegerTupleField: Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 === 0 )
+
+    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+      "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+  }
+
+  // These two not yet done, but are planned
+
+  @Ignore
+  @Test
+  def testFilterBasicType: Unit = {
+    /*
+     * Test filter on basic type
+     */
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getStringDataSet(env)
+
+    val filterDs = ds.filter( _.startsWith("H") )
+
+    filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
+  }
+
+  @Ignore
+  @Test
+  def testFilterOnCustomType: Unit = {
+    /*
+     * Test filter on custom type
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val filterDs = ds.filter( _.myString.contains("a") )
+    filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
+  }
+
+}