You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/10/04 10:00:32 UTC

flink git commit: [FLINK-3656] [table] Convert expression tests to unit tests

Repository: flink
Updated Branches:
  refs/heads/master 95c08eab3 -> 171d10930


[FLINK-3656] [table] Convert expression tests to unit tests

This closes #2567.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/171d1093
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/171d1093
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/171d1093

Branch: refs/heads/master
Commit: 171d10930b3904f51b7d60531bfc0837b7cc19bd
Parents: 95c08ea
Author: twalthr <tw...@apache.org>
Authored: Thu Sep 29 14:54:36 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Oct 4 11:56:16 2016 +0200

----------------------------------------------------------------------
 .../table/expressions/ExpressionParser.scala    |   2 +-
 .../flink/api/java/batch/table/CalcITCase.java  |  34 +++
 .../api/java/batch/table/ExpressionsITCase.java | 239 -------------------
 .../batch/table/StringExpressionsITCase.java    | 216 -----------------
 .../flink/api/scala/batch/sql/CalcITCase.scala  |  51 ++++
 .../api/scala/batch/sql/ExpressionsITCase.scala | 176 --------------
 .../api/scala/batch/table/CalcITCase.scala      |  24 ++
 .../api/scala/batch/table/CastingITCase.scala   | 190 ---------------
 .../scala/batch/table/ExpressionsITCase.scala   | 181 --------------
 .../batch/table/StringExpressionsITCase.scala   |  86 -------
 .../table/expressions/ScalarFunctionsTest.scala |  20 +-
 .../table/expressions/ScalarOperatorsTest.scala | 219 +++++++++++++++++
 12 files changed, 348 insertions(+), 1090 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/171d1093/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index 4707adf..c5e5040 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -109,7 +109,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
       "LONG" ^^ { ti => BasicTypeInfo.LONG_TYPE_INFO } |
       "FLOAT" ^^ { ti => BasicTypeInfo.FLOAT_TYPE_INFO } |
       "DOUBLE" ^^ { ti => BasicTypeInfo.DOUBLE_TYPE_INFO } |
-      ("BOOL" | "BOOLEAN" ) ^^ { ti => BasicTypeInfo.BOOLEAN_TYPE_INFO } |
+      ("BOOLEAN" | "BOOL") ^^ { ti => BasicTypeInfo.BOOLEAN_TYPE_INFO } |
       "STRING" ^^ { ti => BasicTypeInfo.STRING_TYPE_INFO } |
       "DATE" ^^ { ti => SqlTimeTypeInfo.DATE.asInstanceOf[TypeInformation[_]] } |
       "TIMESTAMP" ^^ { ti => SqlTimeTypeInfo.TIMESTAMP } |

http://git-wip-us.apache.org/repos/asf/flink/blob/171d1093/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
index fcdf2e1..6fc8173 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.batch.table;
 
 import java.util.Arrays;
 import java.util.Collection;
+import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.Row;
@@ -29,6 +30,7 @@ import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.table.TableEnvironment;
 import org.apache.flink.api.table.ValidationException;
+import org.apache.flink.api.table.functions.ScalarFunction;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -286,5 +288,37 @@ public class CalcITCase extends TableProgramsTestBase {
 			.filter("foo = 17");
 	}
 
+	public static class OldHashCode extends ScalarFunction {
+		public int eval(String s) {
+			return -1;
+		}
+	}
+
+	public static class HashCode extends ScalarFunction {
+		public int eval(String s) {
+			return s.hashCode();
+		}
+	}
+
+	@Test
+	public void testUserDefinedScalarFunction() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		tableEnv.registerFunction("hashCode", new OldHashCode());
+		tableEnv.registerFunction("hashCode", new HashCode());
+
+		DataSource<String> input = env.fromElements("a", "b", "c");
+
+		Table table = tableEnv.fromDataSet(input, "text");
+
+		Table result = table.select("text.hashCode()");
+
+		DataSet<Integer> ds = tableEnv.toDataSet(result, Integer.class);
+		List<Integer> results = ds.collect();
+		String expected = "97\n98\n99";
+		compareResultAsText(results, expected);
+	}
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/171d1093/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java
deleted file mode 100644
index 5476644..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.codegen.CodeGenException;
-import org.apache.flink.api.table.functions.ScalarFunction;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.table.ValidationException;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class ExpressionsITCase extends TableProgramsTestBase {
-
-	public ExpressionsITCase(TestExecutionMode mode, TableConfigMode configMode) {
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testArithmetic() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Integer, Integer>> input =
-				env.fromElements(new Tuple2<>(5, 10));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b");
-
-		Table result = table.select(
-				"a - 5, a + 5, a / 2, a * 2, a % 2, -a");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "0,10,2,10,1,-5";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testLogic() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Integer, Boolean>> input =
-				env.fromElements(new Tuple2<>(5, true));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b");
-
-		Table result = table.select(
-				"b && true, b && false, b || false, !b");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "true,false,true,false";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testComparisons() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple3<Integer, Integer, Integer>> input =
-				env.fromElements(new Tuple3<>(5, 5, 4));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table.select(
-				"a > c, a >= b, a < c, a.isNull, a.isNotNull");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "true,true,false,false,true";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testNullLiteral() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Integer, Integer>> input =
-				env.fromElements(new Tuple2<>(1, 0));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b");
-
-		Table result = table.select("a, b, Null(INT), Null(STRING) === ''");
-
-		try {
-			DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-			if (!config().getNullCheck()) {
-				fail("Exception expected if null check is disabled.");
-			}
-			List<Row> results = ds.collect();
-			String expected = "1,0,null,null";
-			compareResultAsText(results, expected);
-		}
-		catch (CodeGenException e) {
-			if (config().getNullCheck()) {
-				throw e;
-			}
-		}
-	}
-
-	@Test
-	public void testIf() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Integer, Boolean>> input =
-				env.fromElements(new Tuple2<>(5, true));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b");
-
-		Table result = table.select(
-				"(b && true).?('true', 'false')," +
-					"false.?('true', 'false')," +
-					"true.?(true.?(true.?(10, 4), 4), 4)," +
-					"?((b && true), 'true', 'false')");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "true,false,10,true";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testIfInvalidTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Integer, Boolean>> input =
-				env.fromElements(new Tuple2<>(5, true));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b");
-
-		Table result = table.select("(b && true).?(5, 'false')");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "true,false,3,10";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testComplexExpression() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple3<Integer, Integer, Integer>> input =
-				env.fromElements(new Tuple3<>(5, 5, 4));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table.select(
-				"a.isNull().isNull," +
-					"a.abs() + a.abs().abs().abs().abs()," +
-					"a.cast(STRING) + a.cast(STRING)," +
-					"CAST(ISNULL(b), INT)," +
-					"ISNULL(CAST(b, INT).abs()) === false," +
-					"((((true) === true) || false).cast(STRING) + 'X ').trim," +
-					"12.isNull");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "false,10,55,0,true,trueX,false";
-		compareResultAsText(results, expected);
-	}
-
-	public static class OldHashCode extends ScalarFunction {
-		public int eval(String s) {
-			return -1;
-		}
-	}
-
-	public static class HashCode extends ScalarFunction {
-		public int eval(String s) {
-			return s.hashCode();
-		}
-	}
-
-	@Test
-	public void testUserDefinedScalarFunction() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		tableEnv.registerFunction("hashCode", new OldHashCode());
-		tableEnv.registerFunction("hashCode", new HashCode());
-
-		DataSource<String> input = env.fromElements("a", "b", "c");
-
-		Table table = tableEnv.fromDataSet(input, "text");
-
-		Table result = table.select("text.hashCode()");
-
-		DataSet<Integer> ds = tableEnv.toDataSet(result, Integer.class);
-		List<Integer> results = ds.collect();
-		String expected = "97\n98\n99";
-		compareResultAsText(results, expected);
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/171d1093/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
deleted file mode 100644
index db5eac9..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/StringExpressionsITCase.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.ValidationException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class StringExpressionsITCase extends MultipleProgramsTestBase {
-
-	public StringExpressionsITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Test
-	public void testSubstring() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-				new Tuple2<>("AAAA", 2),
-				new Tuple2<>("BBBB", 1));
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		Table result = in
-				.select("a.substring(1, b)");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "AA\nB";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testSubstringWithByteStart() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<String, Byte>> ds = env.fromElements(
-			new Tuple2<>("AAAA", (byte) 2),
-			new Tuple2<>("BBBB", (byte) 1));
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		Table result = in
-			.select("a.substring(1, b)");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "AA\nB";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testSubstringWithMaxEnd() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-				new Tuple2<>("ABCD", 3),
-				new Tuple2<>("ABCD", 2));
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		Table result = in
-				.select("a.substring(b)");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "CD\nBCD";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testNonWorkingSubstring1() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<String, Float>> ds = env.fromElements(
-				new Tuple2<>("ABCD", 2.0f),
-				new Tuple2<>("ABCD", 1.0f));
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		Table result = in
-			// Must fail. Second parameter of substring must be an Integer not a Double.
-			.select("a.substring(0, b)");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		resultSet.collect();
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testNonWorkingSubstring2() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<String, String>> ds = env.fromElements(
-				new Tuple2<>("ABCD", "a"),
-				new Tuple2<>("ABCD", "b"));
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		Table result = in
-			// Must fail. First parameter of substring must be an Integer not a String.
-			.select("a.substring(b, 15)");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		resultSet.collect();
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testGeneratedCodeForStringComparison() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
-		// Must fail because the comparison here is between Integer(column 'a') and (String 'Fred')
-		Table res = in.filter("a = 'Fred'" );
-		DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testGeneratedCodeForIntegerEqualsComparison() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
-		// Must fail because the comparison here is between String(column 'c') and (Integer 10)
-		Table res = in.filter("c = 10" );
-		DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testGeneratedCodeForIntegerGreaterComparison() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-		DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
-		// Must fail because the comparison here is between String(column 'c') and (Integer 10)
-		Table res = in.filter("c > 10");
-		DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
-	}
-
-	@Test
-	public void testStringConcat() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-			new Tuple2<>("ABCD", 3),
-			new Tuple2<>("ABCD", 2));
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		Table result = in
-			.select("a + b + 42");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "ABCD342\nABCD242";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testStringConcat1() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-			new Tuple2<>("ABCD", 3),
-			new Tuple2<>("ABCD", 2));
-
-		Table in = tableEnv.fromDataSet(ds, "a, b");
-
-		Table result = in
-			.select("42 + b + a");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "44ABCD\n45ABCD";
-		compareResultAsText(results, expected);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/171d1093/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
index 49a97e3..155833b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
@@ -18,13 +18,17 @@
 
 package org.apache.flink.api.scala.batch.sql
 
+
+import java.sql.{Date, Time, Timestamp}
 import java.util
 
 import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.batch.sql.FilterITCase.MyHashCode
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.functions.ScalarFunction
 import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
@@ -264,6 +268,53 @@ class CalcITCase(
     val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testAdvancedDataTypes(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT a, b, c, DATE '1984-07-12', TIME '14:34:24', " +
+      "TIMESTAMP '1984-07-12 14:34:24' FROM MyTable"
+
+    val ds = env.fromElements((
+      Date.valueOf("1984-07-12"),
+      Time.valueOf("14:34:24"),
+      Timestamp.valueOf("1984-07-12 14:34:24")))
+    tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
+      "1984-07-12,14:34:24,1984-07-12 14:34:24.0"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testUserDefinedScalarFunction(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    tEnv.registerFunction("hashCode",
+      new org.apache.flink.api.java.batch.table.CalcITCase.OldHashCode)
+    tEnv.registerFunction("hashCode", MyHashCode)
+
+    val ds = env.fromElements("a", "b", "c")
+    tEnv.registerDataSet("MyTable", ds, 'text)
+
+    val result = tEnv.sql("SELECT hashCode(text) FROM MyTable")
+
+    val expected = "97\n98\n99"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}
+
+object FilterITCase {
+  object MyHashCode extends ScalarFunction {
+    def eval(s: String): Int = s.hashCode()
+  }
 }
 
 object CalcITCase {

http://git-wip-us.apache.org/repos/asf/flink/blob/171d1093/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/ExpressionsITCase.scala
deleted file mode 100644
index cc971dc..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/ExpressionsITCase.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.sql
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.java.batch.table.{ExpressionsITCase => JExpressionsITCase}
-import org.apache.flink.api.scala.batch.sql.ExpressionsITCase.MyHashCode
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.api.table.codegen.CodeGenException
-import org.apache.flink.api.table.functions.ScalarFunction
-import org.apache.flink.api.table.{Row, TableEnvironment}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Assert._
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class ExpressionsITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testNullLiteral(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT a, b, CAST(NULL AS INT), CAST(NULL AS VARCHAR) = '' FROM MyTable"
-
-    val t = env.fromElements((1, 0))
-    tEnv.registerDataSet("MyTable", t, 'a, 'b)
-
-    val result = tEnv.sql(sqlQuery)
-
-    try {
-      val ds = result.toDataSet[Row]
-      if (!config.getNullCheck) {
-        fail("Exception expected if null check is disabled.")
-      }
-      val results = ds.collect()
-      val expected = "1,0,null,null"
-      TestBaseUtils.compareResultAsText(results.asJava, expected)
-    }
-    catch {
-      case e: CodeGenException =>
-        if (config.getNullCheck) {
-          throw e
-        }
-    }
-  }
-
-  @Test
-  def testCase(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END," +
-      "CASE 2 WHEN 1 THEN 'a' ELSE 'b' END," +
-      "CASE 1 WHEN 1, 2 THEN '1 or 2' WHEN 2 THEN 'not possible' WHEN 3, 2 THEN '3' " +
-      "  ELSE 'none of the above' END" +
-      " FROM MyTable"
-
-    val ds = env.fromElements((1, 0))
-    tEnv.registerDataSet("MyTable", ds, 'a, 'b)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "b,b,1 or 2"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCaseWithNull(): Unit = {
-    if (!config.getNullCheck) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT " +
-      "CASE WHEN 'a'='a' THEN 1 END," +
-      "CASE 2 WHEN 1 THEN 'a' WHEN 2 THEN 'bcd' END," +
-      "CASE a WHEN 1 THEN 11 WHEN 2 THEN 4 ELSE NULL END," +
-      "CASE b WHEN 1 THEN 11 WHEN 2 THEN 4 ELSE NULL END," +
-      "CASE 42 WHEN 1 THEN 'a' WHEN 2 THEN 'bcd' END," +
-      "CASE 1 WHEN 1 THEN true WHEN 2 THEN false ELSE NULL END" +
-      " FROM MyTable"
-
-    val ds = env.fromElements((1, 0))
-    tEnv.registerDataSet("MyTable", ds, 'a, 'b)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,bcd,11,null,null,true"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAdvancedDataTypes(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT a, b, c, DATE '1984-07-12', TIME '14:34:24', " +
-      "TIMESTAMP '1984-07-12 14:34:24' FROM MyTable"
-
-    val ds = env.fromElements((
-      Date.valueOf("1984-07-12"),
-      Time.valueOf("14:34:24"),
-      Timestamp.valueOf("1984-07-12 14:34:24")))
-    tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
-      "1984-07-12,14:34:24,1984-07-12 14:34:24.0"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-
-  @Test
-  def testUserDefinedScalarFunction(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    tEnv.registerFunction("hashCode", new JExpressionsITCase.OldHashCode)
-    tEnv.registerFunction("hashCode", MyHashCode)
-
-    val ds = env.fromElements("a", "b", "c")
-    tEnv.registerDataSet("MyTable", ds, 'text)
-
-    val result = tEnv.sql("SELECT hashCode(text) FROM MyTable")
-
-    val expected = "97\n98\n99"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}
-
-object ExpressionsITCase {
-  object MyHashCode extends ScalarFunction {
-    def eval(s: String): Int = s.hashCode()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/171d1093/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
index 4ffda87..8a2612e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.batch.table
 
+import java.sql.{Date, Time, Timestamp}
 import java.util
 
 import org.apache.flink.api.scala._
@@ -400,6 +401,29 @@ class CalcITCase(
     val results = joinT.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testAdvancedDataTypes(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = env
+      .fromElements((
+        BigDecimal("78.454654654654654").bigDecimal,
+        BigDecimal("4E+9999").bigDecimal,
+        Date.valueOf("1984-07-12"),
+        Time.valueOf("14:34:24"),
+        Timestamp.valueOf("1984-07-12 14:34:24")))
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
+        Date.valueOf("1984-07-12"), Time.valueOf("14:34:24"),
+        Timestamp.valueOf("1984-07-12 14:34:24"))
+
+    val expected = "78.454654654654654,4E+9999,1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
+      "11.2,11.2,1984-07-12,14:34:24,1984-07-12 14:34:24.0"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
 }
 
 object CalcITCase {

http://git-wip-us.apache.org/repos/asf/flink/blob/171d1093/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CastingITCase.scala
deleted file mode 100644
index 08de633..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CastingITCase.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import java.util.Date
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{Row, TableEnvironment, Types}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testNumericAutoCastInArithmetic(): Unit = {
-
-    // don't test everything, just some common cast directions
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tEnv)
-      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
-
-    val expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testNumericAutoCastInComparison(): Unit = {
-
-    // don't test everything, just some common cast directions
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f)
-      .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d  && 'f > 1)
-
-    val expected = "2,2,2,2,2.0,2.0"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-  @Ignore // TODO support advanced String operations
-  @Test
-  def testAutoCastToString(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new Date(0))).toTable(tEnv)
-      .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d", '_7 + "Date")
-
-    val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCasting(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements((1, 0.0, 1L, true))
-      .toTable(tEnv)
-      .select(
-        // * -> String
-        '_1.cast(Types.STRING),
-        '_2.cast(Types.STRING),
-        '_3.cast(Types.STRING),
-        '_4.cast(Types.STRING),
-        // NUMERIC TYPE -> Boolean
-        '_1.cast(Types.BOOLEAN),
-        '_2.cast(Types.BOOLEAN),
-        '_3.cast(Types.BOOLEAN),
-        // NUMERIC TYPE -> NUMERIC TYPE
-        '_1.cast(Types.DOUBLE),
-        '_2.cast(Types.INT),
-        '_3.cast(Types.SHORT),
-        // Boolean -> NUMERIC TYPE
-        '_4.cast(Types.DOUBLE),
-        // identity casting
-        '_1.cast(Types.INT),
-        '_2.cast(Types.DOUBLE),
-        '_3.cast(Types.LONG),
-        '_4.cast(Types.BOOLEAN))
-
-    val expected = "1,0.0,1,true," +
-      "true,false,true," +
-      "1.0,0,1," +
-      "1.0," +
-      "1,0.0,1,true\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCastFromString(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("1", "true", "2.0"))
-      .toTable(tEnv)
-      .select(
-        // String -> BASIC TYPE (not String, Date, Void, Character)
-        '_1.cast(Types.BYTE),
-        '_1.cast(Types.SHORT),
-        '_1.cast(Types.INT),
-        '_1.cast(Types.LONG),
-        '_3.cast(Types.DOUBLE),
-        '_3.cast(Types.FLOAT),
-        '_2.cast(Types.BOOLEAN))
-
-    val expected = "1,1,1,1,2.0,2.0,true\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Ignore // Date types not supported yet
-  @Test
-  def testCastDateFromString(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775"))
-      .toTable(tEnv)
-      .select(
-        '_1.cast(Types.DATE).cast(Types.STRING),
-        '_2.cast(Types.DATE).cast(Types.STRING),
-        '_3.cast(Types.DATE).cast(Types.STRING),
-        '_4.cast(Types.DATE).cast(Types.STRING))
-
-    val expected = "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," +
-      "1970-01-17 17:47:53.775\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Ignore // Date types not supported yet
-  @Test
-  def testCastDateToStringAndLong(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = env.fromElements(("2011-05-03 15:51:36.000", "1304437896000"))
-    val t = ds.toTable(tEnv)
-      .select('_1.cast(Types.DATE).as('f0),
-        '_2.cast(Types.DATE).as('f1))
-      .select('f0.cast(Types.STRING),
-        'f0.cast(Types.LONG),
-        'f1.cast(Types.STRING),
-        'f1.cast(Types.LONG))
-
-    val expected = "2011-05-03 15:51:36.000,1304437896000," +
-      "2011-05-03 15:51:36.000,1304437896000\n"
-    val result = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(result.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/171d1093/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala
deleted file mode 100644
index 0107422..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ExpressionsITCase.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.codegen.CodeGenException
-import org.apache.flink.api.table.expressions.Null
-import org.apache.flink.api.table.{Row, TableEnvironment, Types, ValidationException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Assert._
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class ExpressionsITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testArithmetic(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((5, 10)).toTable(tEnv, 'a, 'b)
-      .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a, 3.toExpr + 'a)
-
-    val expected = "0,10,2,10,1,-5,8"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testLogic(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((5, true)).toTable(tEnv, 'a, 'b)
-      .select('b && true, 'b && false, 'b || false, !'b)
-
-    val expected = "true,false,true,false"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testComparisons(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((5, 5, 4)).toTable(tEnv, 'a, 'b, 'c)
-      .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull, 12.toExpr <= 'a)
-
-    val expected = "true,true,false,false,true,false"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCaseInsensitiveForAs(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((3, 5.toByte)).toTable(tEnv, 'a, 'b)
-      .groupBy("a").select("a, a.count As cnt")
-
-    val expected = "3,1"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testNullLiteral(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((1, 0)).toTable(tEnv, 'a, 'b)
-      .select(
-        'a,
-        'b,
-        Null(Types.INT),
-        Null(Types.STRING) === "")
-
-    try {
-      val ds = t.toDataSet[Row]
-      if (!config.getNullCheck) {
-        fail("Exception expected if null check is disabled.")
-      }
-      val results = ds.collect()
-      val expected = "1,0,null,null"
-      TestBaseUtils.compareResultAsText(results.asJava, expected)
-    }
-    catch {
-      case e: CodeGenException =>
-        if (config.getNullCheck) {
-          throw e
-        }
-    }
-  }
-
-  @Test
-  def testIf(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((5, true)).toTable(tEnv, 'a, 'b)
-      .select(
-        ('b && true).?("true", "false"),
-        false.?("true", "false"),
-        true.?(true.?(true.?(10, 4), 4), 4))
-
-    val expected = "true,false,10"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testIfInvalidTypes(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements((5, true)).toTable(tEnv, 'a, 'b)
-      .select(('b && true).?(5, "false"))
-
-    val expected = "true,false,3,10"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAdvancedDataTypes(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env
-      .fromElements((
-        BigDecimal("78.454654654654654").bigDecimal,
-        BigDecimal("4E+9999").bigDecimal,
-        Date.valueOf("1984-07-12"),
-        Time.valueOf("14:34:24"),
-        Timestamp.valueOf("1984-07-12 14:34:24")))
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
-        Date.valueOf("1984-07-12"), Time.valueOf("14:34:24"),
-        Timestamp.valueOf("1984-07-12 14:34:24"))
-
-    val expected = "78.454654654654654,4E+9999,1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
-      "11.2,11.2,1984-07-12,14:34:24,1984-07-12 14:34:24.0"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/171d1093/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/StringExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/StringExpressionsITCase.scala
deleted file mode 100644
index f7d998b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/StringExpressionsITCase.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testSubstring(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("AAAA", 2), ("BBBB", 1)).toTable(tEnv, 'a, 'b)
-      .select('a.substring(1, 'b))
-
-    val expected = "AA\nB"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSubstringWithMaxEnd(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("ABCD", 3), ("ABCD", 2)).toTable(tEnv, 'a, 'b)
-      .select('a.substring('b))
-
-    val expected = "CD\nBCD"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNonWorkingSubstring1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).toTable(tEnv, 'a, 'b)
-      // must fail, second argument of substring must be Integer not Double.
-      .select('a.substring(0, 'b))
-
-    t.toDataSet[Row].collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNonWorkingSubstring2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(("AAAA", "c"), ("BBBB", "d")).toTable(tEnv, 'a, 'b)
-      // must fail, first argument of substring must be Integer not String.
-      .select('a.substring('b, 15))
-
-    t.toDataSet[Row].collect()
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/171d1093/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
index 5506793..beead51 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
 import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{Row, Types}
+import org.apache.flink.api.table.{Row, Types, ValidationException}
 import org.junit.Test
 
 class ScalarFunctionsTest extends ExpressionTestBase {
@@ -83,6 +83,12 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       "SUBSTRING(f0, 1, f7)",
       "Thi")
 
+    testAllApis(
+      'f0.substring(1.cast(Types.BYTE), 'f7),
+      "f0.substring(1.cast(BYTE), f7)",
+      "SUBSTRING(f0, CAST(1 AS TINYINT), f7)",
+      "Thi")
+
     testSqlApi(
       "SUBSTRING(f0 FROM 2 FOR 1)",
       "h")
@@ -92,6 +98,18 @@ class ScalarFunctionsTest extends ExpressionTestBase {
       "his is a test String.")
   }
 
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSubstring1(): Unit = {
+    // Must fail. Parameter of substring must be an Integer not a Double.
+    testTableApi("test".substring(2.0.toExpr), "FAIL", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSubstring2(): Unit = {
+    // Must fail. Parameter of substring must be an Integer not a String.
+    testTableApi("test".substring("test".toExpr), "FAIL", "FAIL")
+  }
+
   @Test
   def testTrim(): Unit = {
     testAllApis(

http://git-wip-us.apache.org/repos/asf/flink/blob/171d1093/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
new file mode 100644
index 0000000..1f5a069
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarOperatorsTest.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Row, Types, ValidationException}
+import org.junit.Test
+
+class ScalarOperatorsTest extends ExpressionTestBase {
+
+  @Test
+  def testCasting(): Unit = {
+    // test casting
+    // * -> String
+    testTableApi('f2.cast(Types.STRING), "f2.cast(STRING)", "1")
+    testTableApi('f5.cast(Types.STRING), "f5.cast(STRING)", "1.0")
+    testTableApi('f3.cast(Types.STRING), "f3.cast(STRING)", "1")
+    testTableApi('f6.cast(Types.STRING), "f6.cast(STRING)", "true")
+    // NUMERIC TYPE -> Boolean
+    testTableApi('f2.cast(Types.BOOLEAN), "f2.cast(BOOLEAN)", "true")
+    testTableApi('f7.cast(Types.BOOLEAN), "f7.cast(BOOLEAN)", "false")
+    testTableApi('f3.cast(Types.BOOLEAN), "f3.cast(BOOLEAN)", "true")
+    // NUMERIC TYPE -> NUMERIC TYPE
+    testTableApi('f2.cast(Types.DOUBLE), "f2.cast(DOUBLE)", "1.0")
+    testTableApi('f7.cast(Types.INT), "f7.cast(INT)", "0")
+    testTableApi('f3.cast(Types.SHORT), "f3.cast(SHORT)", "1")
+    // Boolean -> NUMERIC TYPE
+    testTableApi('f6.cast(Types.DOUBLE), "f6.cast(DOUBLE)", "1.0")
+    // identity casting
+    testTableApi('f2.cast(Types.INT), "f2.cast(INT)", "1")
+    testTableApi('f7.cast(Types.DOUBLE), "f7.cast(DOUBLE)", "0.0")
+    testTableApi('f3.cast(Types.LONG), "f3.cast(LONG)", "1")
+    testTableApi('f6.cast(Types.BOOLEAN), "f6.cast(BOOLEAN)", "true")
+    // String -> BASIC TYPE (not String, Date, Void, Character)
+    testTableApi('f2.cast(Types.BYTE), "f2.cast(BYTE)", "1")
+    testTableApi('f2.cast(Types.SHORT), "f2.cast(SHORT)", "1")
+    testTableApi('f2.cast(Types.INT), "f2.cast(INT)", "1")
+    testTableApi('f2.cast(Types.LONG), "f2.cast(LONG)", "1")
+    testTableApi('f3.cast(Types.DOUBLE), "f3.cast(DOUBLE)", "1.0")
+    testTableApi('f3.cast(Types.FLOAT), "f3.cast(FLOAT)", "1.0")
+    testTableApi('f5.cast(Types.BOOLEAN), "f5.cast(BOOLEAN)", "true")
+
+    // numeric auto cast in arithmetic
+    testTableApi('f0 + 1, "f0 + 1", "2")
+    testTableApi('f1 + 1, "f1 + 1", "2")
+    testTableApi('f2 + 1L, "f2 + 1L", "2")
+    testTableApi('f3 + 1.0f, "f3 + 1.0f", "2.0")
+    testTableApi('f3 + 1.0d, "f3 + 1.0d", "2.0")
+    testTableApi('f5 + 1, "f5 + 1", "2.0")
+    testTableApi('f3 + 1.0d, "f3 + 1.0d", "2.0")
+    testTableApi('f4 + 'f0, "f4 + f0", "2.0")
+
+    // numeric auto cast in comparison
+    testTableApi(
+      'f0 > 0 && 'f1 > 0 && 'f2 > 0L && 'f4 > 0.0f && 'f5 > 0.0d  && 'f3 > 0,
+      "f0 > 0 && f1 > 0 && f2 > 0L && f4 > 0.0f && f5 > 0.0d  && f3 > 0",
+      "true")
+  }
+
+  @Test
+  def testArithmetic(): Unit = {
+    // math arthmetic
+    testTableApi('f8 - 5, "f8 - 5", "0")
+    testTableApi('f8 + 5, "f8 + 5", "10")
+    testTableApi('f8 / 2, "f8 / 2", "2")
+    testTableApi('f8 * 2, "f8 * 2", "10")
+    testTableApi('f8 % 2, "f8 % 2", "1")
+    testTableApi(-'f8, "-f8", "-5")
+    testTableApi(3.toExpr + 'f8, "3 + f8", "8")
+
+    // boolean arithmetic
+    testTableApi('f6 && true, "f6 && true", "true")
+    testTableApi('f6 && false, "f6 && false", "false")
+    testTableApi('f6 || false, "f6 || false", "true")
+    testTableApi(!'f6, "!f6", "false")
+
+    // comparison
+    testTableApi('f8 > 'f2, "f8 > f2", "true")
+    testTableApi('f8 >= 'f8, "f8 >= f8", "true")
+    testTableApi('f8 < 'f2, "f8 < f2", "false")
+    testTableApi('f8.isNull, "f8.isNull", "false")
+    testTableApi('f8.isNotNull, "f8.isNotNull", "true")
+    testTableApi(12.toExpr <= 'f8, "12 <= f8", "false")
+
+    // string arithmetic
+    testTableApi(42.toExpr + 'f10 + 'f9, "42 + f10 + f9", "42String10")
+    testTableApi('f10 + 'f9, "f10 + f9", "String10")
+  }
+
+  @Test
+  def testOtherExpressions(): Unit = {
+    // null
+    testAllApis(Null(Types.INT), "Null(INT)", "CAST(NULL AS INT)", "null")
+    testAllApis(
+      Null(Types.STRING) === "",
+      "Null(STRING) === ''",
+      "CAST(NULL AS VARCHAR) = ''",
+      "null")
+
+    // if
+    testTableApi(('f6 && true).?("true", "false"), "(f6 && true).?('true', 'false')", "true")
+    testTableApi(false.?("true", "false"), "false.?('true', 'false')", "false")
+    testTableApi(
+      true.?(true.?(true.?(10, 4), 4), 4),
+      "true.?(true.?(true.?(10, 4), 4), 4)",
+      "10")
+    testTableApi(true, "?((f6 && true), 'true', 'false')", "true")
+    testSqlApi("CASE 11 WHEN 1 THEN 'a' ELSE 'b' END", "b")
+    testSqlApi("CASE 2 WHEN 1 THEN 'a' ELSE 'b' END", "b")
+    testSqlApi(
+      "CASE 1 WHEN 1, 2 THEN '1 or 2' WHEN 2 THEN 'not possible' WHEN 3, 2 THEN '3' " +
+      "ELSE 'none of the above' END",
+      "1 or 2")
+    testSqlApi("CASE WHEN 'a'='a' THEN 1 END", "1")
+    testSqlApi("CASE 2 WHEN 1 THEN 'a' WHEN 2 THEN 'bcd' END", "bcd")
+    testSqlApi("CASE f2 WHEN 1 THEN 11 WHEN 2 THEN 4 ELSE NULL END", "11")
+    testSqlApi("CASE f7 WHEN 1 THEN 11 WHEN 2 THEN 4 ELSE NULL END", "null")
+    testSqlApi("CASE 42 WHEN 1 THEN 'a' WHEN 2 THEN 'bcd' END", "null")
+    testSqlApi("CASE 1 WHEN 1 THEN true WHEN 2 THEN false ELSE NULL END", "true")
+
+    // case insensitive as
+    testTableApi(5 as 'test, "5 As test", "5")
+
+    // complex expressions
+    testTableApi('f0.isNull.isNull, "f0.isNull().isNull", "false")
+    testTableApi(
+      'f8.abs() + 'f8.abs().abs().abs().abs(),
+      "f8.abs() + f8.abs().abs().abs().abs()",
+      "10")
+    testTableApi(
+      'f8.cast(Types.STRING) + 'f8.cast(Types.STRING),
+      "f8.cast(STRING) + f8.cast(STRING)",
+      "55")
+    testTableApi('f8.isNull.cast(Types.INT), "CAST(ISNULL(f8), INT)", "0")
+    testTableApi(
+      'f8.cast(Types.INT).abs().isNull === false,
+      "ISNULL(CAST(f8, INT).abs()) === false",
+      "true")
+    testTableApi(
+      (((true === true) || false).cast(Types.STRING) + "X ").trim(),
+      "((((true) === true) || false).cast(STRING) + 'X ').trim",
+      "trueX")
+    testTableApi(12.isNull, "12.isNull", "false")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIfInvalidTypesScala(): Unit = {
+    testTableApi(('f6 && true).?(5, "false"), "FAIL", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIfInvalidTypesJava(): Unit = {
+    testTableApi("FAIL", "(f8 && true).?(5, 'false')", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidStringComparison1(): Unit = {
+    testTableApi("w" === 4, "FAIL", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidStringComparison2(): Unit = {
+    testTableApi("w" > 4.toExpr, "FAIL", "FAIL")
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  def testData = {
+    val testData = new Row(11)
+    testData.setField(0, 1: Byte)
+    testData.setField(1, 1: Short)
+    testData.setField(2, 1)
+    testData.setField(3, 1L)
+    testData.setField(4, 1.0f)
+    testData.setField(5, 1.0d)
+    testData.setField(6, true)
+    testData.setField(7, 0.0d)
+    testData.setField(8, 5)
+    testData.setField(9, 10)
+    testData.setField(10, "String")
+    testData
+  }
+
+  def typeInfo = {
+    new RowTypeInfo(Seq(
+      Types.BYTE,
+      Types.SHORT,
+      Types.INT,
+      Types.LONG,
+      Types.FLOAT,
+      Types.DOUBLE,
+      Types.BOOLEAN,
+      Types.DOUBLE,
+      Types.INT,
+      Types.INT,
+      Types.STRING
+      )).asInstanceOf[TypeInformation[Any]]
+  }
+
+}