You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2016/04/11 18:02:41 UTC
[2/2] flink git commit: [FLINK-3579] Improve String concatenation in
the Table API
[FLINK-3579] Improve String concatenation in the Table API
This closes #1821
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e054393
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e054393
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e054393
Branch: refs/heads/master
Commit: 9e054393bbe7190bd239bd2bb926d588aa5d1c6f
Parents: e16ca84
Author: ramkrishna <ra...@gmail.com>
Authored: Mon Mar 21 12:52:31 2016 +0530
Committer: vasia <va...@apache.org>
Committed: Mon Apr 11 18:01:40 2016 +0200
----------------------------------------------------------------------
.../api/table/plan/RexNodeTranslator.scala | 14 ++++++-
.../table/test/StringExpressionsITCase.java | 43 +++++++++++++++++++-
2 files changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9e054393/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index 1668efb..b50b74b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -20,9 +20,11 @@ package org.apache.flink.api.table.plan
import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.tools.RelBuilder
import org.apache.calcite.tools.RelBuilder.AggCall
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.table.expressions._
import org.apache.flink.api.table.typeutils.TypeConverter
@@ -134,7 +136,17 @@ object RexNodeTranslator {
case Plus(left, right) =>
val l = toRexNode(left, relBuilder)
val r = toRexNode(right, relBuilder)
- relBuilder.call(SqlStdOperatorTable.PLUS, l, r)
+ if(SqlTypeName.STRING_TYPES.contains(l.getType.getSqlTypeName)) {
+ val cast: RexNode = relBuilder.cast(r,
+ TypeConverter.typeInfoToSqlType(BasicTypeInfo.STRING_TYPE_INFO))
+ relBuilder.call(SqlStdOperatorTable.PLUS, l, cast)
+ } else if(SqlTypeName.STRING_TYPES.contains(r.getType.getSqlTypeName)) {
+ val cast: RexNode = relBuilder.cast(l,
+ TypeConverter.typeInfoToSqlType(BasicTypeInfo.STRING_TYPE_INFO))
+ relBuilder.call(SqlStdOperatorTable.PLUS, cast, r)
+ } else {
+ relBuilder.call(SqlStdOperatorTable.PLUS, l, r)
+ }
case Minus(left, right) =>
val l = toRexNode(left, relBuilder)
val r = toRexNode(right, relBuilder)
http://git-wip-us.apache.org/repos/asf/flink/blob/9e054393/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
index 65f0470..86a3bfd 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.table.TableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -150,7 +149,47 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase {
DataSet<Tuple3<Integer, Long, String>> tupleDataSet = CollectionDataSets.get3TupleDataSet(env);
Table in = tableEnv.fromDataSet(tupleDataSet, "a, b, c");
// Must fail because the comparison here is between String(column 'c') and (Integer 10)
- Table res = in.filter("c > 10" );
+ Table res = in.filter("c > 10");
DataSet<Row> resultSet = tableEnv.toDataSet(res, Row.class);
}
+
+ @Test
+ public void testStringConcat() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ TableEnvironment tableEnv = new TableEnvironment();
+
+ DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+ new Tuple2<>("ABCD", 3),
+ new Tuple2<>("ABCD", 2));
+
+ Table in = tableEnv.fromDataSet(ds, "a, b");
+
+ Table result = in
+ .select("a + b + 42");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "ABCD342\nABCD242";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testStringConcat1() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ TableEnvironment tableEnv = new TableEnvironment();
+
+ DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+ new Tuple2<>("ABCD", 3),
+ new Tuple2<>("ABCD", 2));
+
+ Table in = tableEnv.fromDataSet(ds, "a, b");
+
+ Table result = in
+ .select("42 + b + a");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "44ABCD\n45ABCD";
+ compareResultAsText(results, expected);
+ }
}