You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2016/04/11 18:02:41 UTC

[2/2] flink git commit: [FLINK-3579] Improve String concatenation in the Table API

[FLINK-3579] Improve String concatenation in the Table API

This closes #1821


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e054393
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e054393
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e054393

Branch: refs/heads/master
Commit: 9e054393bbe7190bd239bd2bb926d588aa5d1c6f
Parents: e16ca84
Author: ramkrishna <ra...@gmail.com>
Authored: Mon Mar 21 12:52:31 2016 +0530
Committer: vasia <va...@apache.org>
Committed: Mon Apr 11 18:01:40 2016 +0200

----------------------------------------------------------------------
 .../api/table/plan/RexNodeTranslator.scala      | 14 ++++++-
 .../table/test/StringExpressionsITCase.java     | 43 +++++++++++++++++++-
 2 files changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e054393/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index 1668efb..b50b74b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -20,9 +20,11 @@ package org.apache.flink.api.table.plan
 
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 import org.apache.calcite.tools.RelBuilder.AggCall
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.table.expressions._
 import org.apache.flink.api.table.typeutils.TypeConverter
 
@@ -134,7 +136,17 @@ object RexNodeTranslator {
       case Plus(left, right) =>
         val l = toRexNode(left, relBuilder)
         val r = toRexNode(right, relBuilder)
-        relBuilder.call(SqlStdOperatorTable.PLUS, l, r)
+        if(SqlTypeName.STRING_TYPES.contains(l.getType.getSqlTypeName)) {
+          val cast: RexNode = relBuilder.cast(r,
+            TypeConverter.typeInfoToSqlType(BasicTypeInfo.STRING_TYPE_INFO))
+          relBuilder.call(SqlStdOperatorTable.PLUS, l, cast)
+        } else if(SqlTypeName.STRING_TYPES.contains(r.getType.getSqlTypeName)) {
+          val cast: RexNode = relBuilder.cast(l,
+            TypeConverter.typeInfoToSqlType(BasicTypeInfo.STRING_TYPE_INFO))
+          relBuilder.call(SqlStdOperatorTable.PLUS, cast, r)
+        } else {
+          relBuilder.call(SqlStdOperatorTable.PLUS, l, r)
+        }
       case Minus(left, right) =>
         val l = toRexNode(left, relBuilder)
         val r = toRexNode(right, relBuilder)

http://git-wip-us.apache.org/repos/asf/flink/blob/9e054393/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
index 65f0470..86a3bfd 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.TableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -150,7 +149,47 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
 		Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
 		// Must fail because the comparison here is between String(column 'c') and (Integer 10)
-		Table res = in.filter("c > 10" );
+		Table res = in.filter("c > 10");
 		DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
 	}
+
+	@Test
+	public void testStringConcat() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+			new Tuple2<>("ABCD", 3),
+			new Tuple2<>("ABCD", 2));
+
+		Table in = tableEnv.fromDataSet(ds, "a, b");
+
+		Table result = in
+			.select("a + b + 42");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "ABCD342\nABCD242";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testStringConcat1() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+			new Tuple2<>("ABCD", 3),
+			new Tuple2<>("ABCD", 2));
+
+		Table in = tableEnv.fromDataSet(ds, "a, b");
+
+		Table result = in
+			.select("42 + b + a");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "44ABCD\n45ABCD";
+		compareResultAsText(results, expected);
+	}
 }