You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/08 17:50:52 UTC

[1/5] flink git commit: [FLINK-7971] [table] Fix potential NPE in non-windowed aggregation.

Repository: flink
Updated Branches:
  refs/heads/release-1.4 f5a0b4bdf -> c79432915


[FLINK-7971] [table] Fix potential NPE in non-windowed aggregation.

This closes #4941.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a126bd3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a126bd3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a126bd3e

Branch: refs/heads/release-1.4
Commit: a126bd3e7d9614749f61692fbb53c5b284f17091
Parents: f5a0b4b
Author: Xpray <le...@gmail.com>
Authored: Fri Nov 3 15:19:42 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 18:43:31 2017 +0100

----------------------------------------------------------------------
 .../flink/table/runtime/aggregate/GroupAggProcessFunction.scala | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a126bd3e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 91c379f..3970320 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -97,11 +97,14 @@ class GroupAggProcessFunction(
     if (null == accumulators) {
       firstRow = true
       accumulators = function.createAccumulators()
-      inputCnt = 0L
     } else {
       firstRow = false
     }
 
+    if (null == inputCnt) {
+      inputCnt = 0L
+    }
+
     // Set group keys value to the final output
     function.setForwardedFields(input, newRow.row)
     function.setForwardedFields(input, prevRow.row)


[4/5] flink git commit: [FLINK-8012] [table] Fix TableSink config for tables with time attributes.

Posted by fh...@apache.org.
[FLINK-8012] [table] Fix TableSink config for tables with time attributes.

This closes #4974.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51657fc6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51657fc6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51657fc6

Branch: refs/heads/release-1.4
Commit: 51657fc6deaf28115020db86d031d536b09bf384
Parents: 1b20f70
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Nov 7 17:57:39 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 18:43:57 2017 +0100

----------------------------------------------------------------------
 .../scala/org/apache/flink/table/api/table.scala     |  7 ++++++-
 .../table/runtime/stream/table/TableSinkITCase.scala | 15 ++++++++++++---
 2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51657fc6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 0430e49..7349a0e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -848,7 +848,12 @@ class Table(
     val rowType = getRelNode.getRowType
     val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
     val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
-      .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
+      .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
+      .map {
+        // replace time indicator types by SQL_TIMESTAMP
+        case t: TypeInformation[_] if FlinkTypeFactory.isTimeIndicatorType(t) => Types.SQL_TIMESTAMP
+        case t: TypeInformation[_] => t
+      }.toArray
 
     // configure the table sink
     val configuredSink = sink.configure(fieldNames, fieldTypes)

http://git-wip-us.apache.org/repos/asf/flink/blob/51657fc6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 07934b8..b44d8ef 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -88,13 +88,16 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setParallelism(4)
 
     val input = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._2)
       .map(x => x).setParallelism(4) // increase DOP to 4
 
-    val results = input.toTable(tEnv, 'a, 'b, 'c)
+    val results = input.toTable(tEnv, 'a, 'b.rowtime, 'c)
       .where('a < 5 || 'a > 17)
       .select('c, 'b)
       .writeToSink(new CsvTableSink(path))
@@ -102,8 +105,14 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
     env.execute()
 
     val expected = Seq(
-      "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3",
-      "Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n")
+      "Hi,1970-01-01 00:00:00.001",
+      "Hello,1970-01-01 00:00:00.002",
+      "Hello world,1970-01-01 00:00:00.002",
+      "Hello world, how are you?,1970-01-01 00:00:00.003",
+      "Comment#12,1970-01-01 00:00:00.006",
+      "Comment#13,1970-01-01 00:00:00.006",
+      "Comment#14,1970-01-01 00:00:00.006",
+      "Comment#15,1970-01-01 00:00:00.006").mkString("\n")
 
     TestBaseUtils.compareResultsByLinesInMemory(expected, path)
   }


[2/5] flink git commit: [FLINK-7922] [table] Fix FlinkTypeFactory.leastRestrictive for composite types.

Posted by fh...@apache.org.
[FLINK-7922] [table] Fix FlinkTypeFactory.leastRestrictive for composite types.

This closes #4929.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c60f97a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c60f97a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c60f97a

Branch: refs/heads/release-1.4
Commit: 8c60f97a43defc57bb1bfaabdd6081b329db53b8
Parents: a126bd3
Author: Rong Rong <ro...@uber.com>
Authored: Tue Oct 31 11:05:38 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 18:43:39 2017 +0100

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  | 39 +++++----
 .../table/api/batch/sql/SetOperatorsTest.scala  | 55 +++++++++++++
 .../api/batch/table/SetOperatorsTest.scala      | 59 ++++++++++++++
 .../flink/table/api/stream/sql/UnionTest.scala  | 83 ++++++++++++++++++++
 .../table/expressions/ScalarOperatorsTest.scala |  5 +-
 .../utils/ScalarOperatorsTestBase.scala         |  9 ++-
 .../stream/table/SetOperatorsITCase.scala       | 20 ++++-
 .../table/runtime/utils/CommonTestData.scala    |  6 ++
 .../flink/table/utils/TableTestBase.scala       | 26 ++++++
 9 files changed, 280 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 2874e61..04fab76 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -268,30 +268,37 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
   override def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
     val type0 = types.get(0)
     if (type0.getSqlTypeName != null) {
-      val resultType = resolveAny(types)
-      if (resultType != null) {
-        return resultType
+      val resultType = resolveAllIdenticalTypes(types)
+      if (resultType.isDefined) {
+        // result type for identical types
+        return resultType.get
       }
     }
+    // fall back to super
     super.leastRestrictive(types)
   }
 
-  private def resolveAny(types: util.List[RelDataType]): RelDataType = {
+  private def resolveAllIdenticalTypes(types: util.List[RelDataType]): Option[RelDataType] = {
     val allTypes = types.asScala
-    val hasAny = allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY)
-    if (hasAny) {
-      val head = allTypes.head
-      // only allow ANY with exactly the same GenericRelDataType for all types
-      if (allTypes.forall(_ == head)) {
-        val nullable = allTypes.exists(
-          sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL
-        )
-        createTypeWithNullability(head, nullable)
-      } else {
+
+    val head = allTypes.head
+    // check if all types are the same
+    if (allTypes.forall(_ == head)) {
+      // types are the same, check nullability
+      val nullable = allTypes
+        .exists(sqlType => sqlType.isNullable || sqlType.getSqlTypeName == SqlTypeName.NULL)
+      // return type with nullability
+      Some(createTypeWithNullability(head, nullable))
+    } else {
+      // types are not all the same
+      if (allTypes.exists(_.getSqlTypeName == SqlTypeName.ANY)) {
+        // one of the type was ANY.
+        // we cannot generate a common type if it differs from other types.
         throw TableException("Generic ANY types must have a common type information.")
+      } else {
+        // cannot resolve a common type for different input types
+        None
       }
-    } else {
-      null
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
index bc9b453..bff0b78 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/SetOperatorsTest.scala
@@ -18,8 +18,11 @@
 
 package org.apache.flink.table.api.batch.sql
 
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.TableTestBase
 import org.junit.{Ignore, Test}
@@ -178,4 +181,56 @@ class SetOperatorsTest extends TableTestBase {
       expected
     )
   }
+
+  @Test
+  def testUnionNullableTypes(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[((Int, String), (Int, String), Int)]("A", 'a, 'b, 'c)
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "a")
+      ),
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "CASE(>(c, 0), b, null) AS EXPR$0")
+      ),
+      term("union", "a")
+    )
+
+    util.verifySql(
+      "SELECT a FROM A UNION ALL SELECT CASE WHEN c > 0 THEN b ELSE NULL END FROM A",
+      expected
+    )
+  }
+
+  @Test
+  def testUnionAnyType(): Unit = {
+    val util = batchTestUtil()
+    val typeInfo = Types.ROW(
+      new GenericTypeInfo(classOf[NonPojo]),
+      new GenericTypeInfo(classOf[NonPojo]))
+    util.addJavaTable(typeInfo, "A", "a, b")
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "a")
+      ),
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "b")
+      ),
+      term("union", "a")
+    )
+
+    util.verifyJavaSql("SELECT a FROM A UNION ALL SELECT b FROM A", expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 686973e..2d4e205 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -20,8 +20,12 @@ package org.apache.flink.table.api.batch.table
 
 import java.sql.Timestamp
 
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Null
+import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil._
 import org.junit.Test
@@ -76,4 +80,59 @@ class SetOperatorsTest extends TableTestBase {
 
     util.verifyTable(in, expected)
   }
+
+  @Test
+  def testUnionNullableTypes(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[((Int, String), (Int, String), Int)]("A", 'a, 'b, 'c)
+
+    val in = t.select('a)
+      .unionAll(
+        t.select(('c > 0) ? ('b, Null(createTypeInformation[(Int, String)]))))
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "a")
+      ),
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "CASE(>(c, 0), b, null) AS _c0")
+      ),
+      term("union", "a")
+    )
+
+    util.verifyTable(in, expected)
+  }
+
+  @Test
+  def testUnionAnyType(): Unit = {
+    val util = batchTestUtil()
+    val typeInfo = Types.ROW(
+      new GenericTypeInfo(classOf[NonPojo]),
+      new GenericTypeInfo(classOf[NonPojo]))
+    val t = util.addJavaTable(typeInfo, "A", "a, b")
+
+    val in = t.select('a).unionAll(t.select('b))
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "a")
+      ),
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "b")
+      ),
+      term("union", "a")
+    )
+
+    util.verifyJavaTable(in, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
new file mode 100644
index 0000000..7e807f6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/UnionTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class UnionTest extends TableTestBase {
+
+  @Test
+  def testUnionAllNullableCompositeType() = {
+    val streamUtil = streamTestUtil()
+    streamUtil.addTable[((Int, String), (Int, String), Int)]("A", 'a, 'b, 'c)
+
+    val expected = binaryNode(
+      "DataStreamUnion",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "a")
+      ),
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "CASE(>(c, 0), b, null) AS EXPR$0")
+      ),
+      term("union all", "a")
+    )
+
+    streamUtil.verifySql(
+      "SELECT a FROM A UNION ALL SELECT CASE WHEN c > 0 THEN b ELSE NULL END FROM A",
+      expected
+    )
+  }
+
+  @Test
+  def testUnionAnyType(): Unit = {
+    val streamUtil = streamTestUtil()
+    val typeInfo = Types.ROW(
+      new GenericTypeInfo(classOf[NonPojo]),
+      new GenericTypeInfo(classOf[NonPojo]))
+    streamUtil.addJavaTable(typeInfo, "A", "a, b")
+
+    val expected = binaryNode(
+      "DataStreamUnion",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "a")
+      ),
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "b")
+      ),
+      term("union all", "a")
+    )
+
+    streamUtil.verifyJavaSql("SELECT a FROM A UNION ALL SELECT b FROM A", expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index cbdce8b..6dd2afc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -211,7 +211,6 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
 
   @Test
   def testOtherExpressions(): Unit = {
-
     // nested field null type
     testSqlApi("CASE WHEN f13.f1 IS NULL THEN 'a' ELSE 'b' END", "a")
     testSqlApi("CASE WHEN f13.f1 IS NOT NULL THEN 'a' ELSE 'b' END", "b")
@@ -222,6 +221,10 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
     testAllApis('f13.get("f1").isNull, "f13.get('f1').isNull", "f13.f1 IS NULL", "true")
     testAllApis('f13.get("f1").isNotNull, "f13.get('f1').isNotNull", "f13.f1 IS NOT NULL", "false")
 
+    // array element access test
+    testSqlApi("CASE WHEN f18 IS NOT NULL THEN f18[1] ELSE NULL END", "1")
+    testSqlApi("CASE WHEN f19 IS NOT NULL THEN f19[1] ELSE NULL END", "(1,a)")
+
     // boolean literals
     testAllApis(
       true,

http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
index 149d8c1..3eeb215 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
@@ -22,6 +22,7 @@ import java.sql.Date
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala.createTypeInformation
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.types.Row
@@ -29,7 +30,7 @@ import org.apache.flink.types.Row
 class ScalarOperatorsTestBase extends ExpressionTestBase {
 
   def testData: Row = {
-    val testData = new Row(18)
+    val testData = new Row(20)
     testData.setField(0, 1: Byte)
     testData.setField(1, 1: Short)
     testData.setField(2, 1)
@@ -48,6 +49,8 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
     testData.setField(15, Date.valueOf("1996-11-10"))
     testData.setField(16, BigDecimal("0.00000000").bigDecimal)
     testData.setField(17, BigDecimal("10.0").bigDecimal)
+    testData.setField(18, Array[Integer](1,2))
+    testData.setField(19, Array[(Int, String)]((1,"a"), (2, "b")))
     testData
   }
 
@@ -70,7 +73,9 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
       Types.STRING,
       Types.SQL_DATE,
       Types.DECIMAL,
-      Types.DECIMAL
+      Types.DECIMAL,
+      Types.OBJECT_ARRAY(Types.INT),
+      Types.OBJECT_ARRAY(createTypeInformation[(Int, String)])
       ).asInstanceOf[TypeInformation[Any]]
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
index cf195a5..5e15e14 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/SetOperatorsITCase.scala
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.types.Row
 import org.junit.Assert._
@@ -88,9 +89,22 @@ class SetOperatorsITCase extends StreamingMultipleProgramsTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-  class NonPojo {
-    val x = new java.util.HashMap[String, String]()
+  @Test
+  def testUnionWithCompositeType(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val s1 = env.fromElements((1, (1, "a")), (2, (2, "b")))
+      .toTable(tEnv, 'a, 'b)
+    val s2 = env.fromElements(((3, "c"), 3), ((4, "d"), 4))
+      .toTable(tEnv, 'a, 'b)
 
-    override def toString: String = x.toString
+    val result = s1.unionAll(s2.select('b, 'a)).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("1,(1,a)", "2,(2,b)", "3,(3,c)", "4,(4,d)")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index e8568ca..9223887 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -176,4 +176,10 @@ object CommonTestData {
       this(null, null)
     }
   }
+
+  class NonPojo {
+    val x = new java.util.HashMap[String, String]()
+
+    override def toString: String = x.toString
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c60f97a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 0a0d12e..4042f50 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -190,6 +190,19 @@ case class BatchTableTestUtil() extends TableTestUtil {
       actual.split("\n").map(_.trim).mkString("\n"))
   }
 
+  def verifyJavaSql(query: String, expected: String): Unit = {
+    verifyJavaTable(javaTableEnv.sqlQuery(query), expected)
+  }
+
+  def verifyJavaTable(resultTable: Table, expected: String): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = javaTableEnv.optimize(relNode)
+    val actual = RelOptUtil.toString(optimized)
+    assertEquals(
+      expected.split("\n").map(_.trim).mkString("\n"),
+      actual.split("\n").map(_.trim).mkString("\n"))
+  }
+
   def printTable(resultTable: Table): Unit = {
     val relNode = resultTable.getRelNode
     val optimized = tableEnv.optimize(relNode)
@@ -268,6 +281,19 @@ case class StreamTableTestUtil() extends TableTestUtil {
       actual.split("\n").map(_.trim).mkString("\n"))
   }
 
+  def verifyJavaSql(query: String, expected: String): Unit = {
+    verifyJavaTable(javaTableEnv.sqlQuery(query), expected)
+  }
+
+  def verifyJavaTable(resultTable: Table, expected: String): Unit = {
+    val relNode = resultTable.getRelNode
+    val optimized = javaTableEnv.optimize(relNode, updatesAsRetraction = false)
+    val actual = RelOptUtil.toString(optimized)
+    assertEquals(
+      expected.split("\n").map(_.trim).mkString("\n"),
+      actual.split("\n").map(_.trim).mkString("\n"))
+  }
+
   // the print methods are for debugging purposes only
   def printTable(resultTable: Table): Unit = {
     val relNode = resultTable.getRelNode


[5/5] flink git commit: [FLINK-8002] [table] Fix join window boundary for LESS_THAN and GREATER_THAN predicates.

Posted by fh...@apache.org.
[FLINK-8002] [table] Fix join window boundary for LESS_THAN and GREATER_THAN predicates.

This closes #4962.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7943291
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7943291
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7943291

Branch: refs/heads/release-1.4
Commit: c7943291599260003304f003e89725352ae7d836
Parents: 51657fc
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Nov 6 21:22:35 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 18:44:06 2017 +0100

----------------------------------------------------------------------
 .../table/runtime/join/WindowJoinUtil.scala     |  8 +++--
 .../flink/table/api/stream/sql/JoinTest.scala   | 38 ++++++++++++++++++--
 2 files changed, 42 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c7943291/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index 1693c41..7006476 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -346,10 +346,14 @@ object WindowJoinUtil {
       leftLiteral.get - rightLiteral.get
     }
     val boundary = timePred.pred.getKind match {
-      case SqlKind.LESS_THAN =>
+      case SqlKind.LESS_THAN if timePred.leftInputOnLeftSide =>
         tmpTimeOffset - 1
-      case SqlKind.GREATER_THAN =>
+      case SqlKind.LESS_THAN if !timePred.leftInputOnLeftSide =>
         tmpTimeOffset + 1
+      case SqlKind.GREATER_THAN if timePred.leftInputOnLeftSide =>
+        tmpTimeOffset + 1
+      case SqlKind.GREATER_THAN if !timePred.leftInputOnLeftSide =>
+        tmpTimeOffset - 1
       case _ =>
         tmpTimeOffset
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/c7943291/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index ded5c51..8c1865c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -379,14 +379,27 @@ class JoinTest extends TableTestBase {
       "rowtime")
 
     verifyTimeBoundary(
-      "t1.c - interval '2' second >= t2.c + interval '1' second -" +
-        "interval '10' second and " +
+      "t1.c >= t2.c - interval '1' second and " +
+        "t1.c <= t2.c + interval '10' second",
+      -1000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c - interval '2' second >= t2.c + interval '1' second - interval '10' second and " +
         "t1.c <= t2.c + interval '10' second",
       -7000,
       10000,
       "rowtime")
 
     verifyTimeBoundary(
+      "t2.c + interval '1' second - interval '10' second <= t1.c - interval '2' second and " +
+        "t2.c + interval '10' second >= t1.c",
+      -7000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
       "t1.c >= t2.c - interval '10' second and " +
         "t1.c <= t2.c - interval '5' second",
       -10000,
@@ -394,6 +407,27 @@ class JoinTest extends TableTestBase {
       "rowtime")
 
     verifyTimeBoundary(
+      "t2.c - interval '10' second <= t1.c and " +
+        "t2.c - interval '5' second >= t1.c",
+      -10000,
+      -5000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c > t2.c - interval '2' second and " +
+        "t1.c < t2.c + interval '2' second",
+      -1999,
+      1999,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t2.c > t1.c - interval '2' second and " +
+        "t2.c < t1.c + interval '2' second",
+      -1999,
+      1999,
+      "rowtime")
+
+    verifyTimeBoundary(
       "t1.c = t2.c",
       0,
       0,


[3/5] flink git commit: [FLINK-7996] [table] Add support for (left.time = right.time) predicates to window join.

Posted by fh...@apache.org.
[FLINK-7996] [table] Add support for (left.time = right.time) predicates to window join.

This closes #4977.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b20f70d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b20f70d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b20f70d

Branch: refs/heads/release-1.4
Commit: 1b20f70dea3fddfaeaf00ceae44e4dc0fcb4f47b
Parents: 8c60f97
Author: Xingcan Cui <xi...@gmail.com>
Authored: Wed Nov 8 01:17:57 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Nov 8 18:43:51 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sql.md                           | 17 ++--
 docs/dev/table/tableApi.md                      | 26 +++---
 .../table/runtime/join/WindowJoinUtil.scala     | 95 ++++++++++++--------
 .../flink/table/api/stream/sql/JoinTest.scala   | 66 ++++++++++++++
 .../sql/validation/JoinValidationTest.scala     | 29 ++++++
 .../flink/table/api/stream/table/JoinTest.scala | 34 +++++++
 .../table/runtime/stream/sql/JoinITCase.scala   | 47 ++++++++++
 7 files changed, 259 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 2318271..3097d9e 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -400,14 +400,15 @@ FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
       <td>
         <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
 
-        <p>A time-windowed join requires at least one equi-join predicate and a special join 
-        condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
-          <ul>
-            <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
-            <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
-          </ul>
-        </p>
-
+        <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>), a <code>BETWEEN</code> predicate, or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> 
+        <p>For example, the following predicates are valid window join conditions:</p>
+          
+        <ul>
+          <li><code>ltime = rtime</code></li>
+          <li><code>ltime &gt;= rtime AND ltime &lt; rtime + INTERVAL '10' MINUTE</code></li>
+          <li><code>ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND</code></li>
+        </ul>
+                
         <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
 
 {% highlight sql %}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 7cce042..f5a2059 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -527,13 +527,14 @@ Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
       <td>
         <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
 
-        <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
-          <ul>
-            <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
-            <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
-          </ul>
-        </p>
+        <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> 
+        <p>For example, the following predicates are valid window join conditions:</p>
 
+        <ul>
+          <li><code>ltime === rtime</code></li>
+          <li><code>ltime &gt;= rtime &amp;&amp; ltime &lt; rtime + 10.minutes</code></li>
+        </ul>
+        
         <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
 
 {% highlight java %}
@@ -644,13 +645,14 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
       <td>
         <p><b>Note:</b> Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.</p>
 
-        <p>A time-windowed join requires at least one equi-join predicate and a special join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a <code>BETWEEN</code> predicate (which is not available in Table API yet) that compares the <a href="streaming.html#time-attributes">time attributes</a> of both input tables. The following rules apply for time predicates:
-          <ul>
-            <li>The time attribute of a table must be compared to a bounded interval on a time attribute of the opposite table.</li>
-            <li>The compared time attributes must be of the same type, i.e., both are processing time or event time.</li>
-          </ul>
-        </p>
+        <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>) or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p> 
+        <p>For example, the following predicates are valid window join conditions:</p>
 
+        <ul>
+          <li><code>'ltime === 'rtime</code></li>
+          <li><code>'ltime &gt;= 'rtime &amp;&amp; 'ltime &lt; 'rtime + 10.minutes</code></li>
+        </ul>
+        
         <p><b>Note:</b> Currently, only <code>INNER</code> time-windowed joins are supported.</p>
 
 {% highlight scala %}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index 863f342..1693c41 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -78,29 +78,45 @@ object WindowJoinUtil {
     // Converts the condition to conjunctive normal form (CNF)
     val cnfCondition = RexUtil.toCnf(rexBuilder, predicate)
 
-    // split the condition into time indicator condition and other condition
+    // split the condition into time predicates and other predicates
+    // We need two range predicates or an equality predicate for a properly bounded window join.
     val (timePreds, otherPreds) = cnfCondition match {
-        // We need at least two comparison predicates for a properly bounded window join.
-        // So we need an AND expression for a valid window join.
-        case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
-          c.getOperands.asScala
-            .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema))
-            .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => {
-              analyzed match {
-                case Left(timePred) => (preds._1 :+ timePred, preds._2)
-                case Right(otherPred) => (preds._1, preds._2 :+ otherPred)
-              }
-            })
-        case _ =>
-          // No valid window bounds. A windowed stream join requires two comparison predicates that
-          // bound the time in both directions.
-          return (None, Some(predicate))
+      case c: RexCall if cnfCondition.getKind == SqlKind.AND =>
+        // extract all time predicates from conjunctive predicate
+        c.getOperands.asScala
+          .map(identifyTimePredicate(_, leftLogicalFieldCnt, inputSchema))
+          .foldLeft((Seq[TimePredicate](), Seq[RexNode]()))((preds, analyzed) => {
+            analyzed match {
+              case Left(timePred) => (preds._1 :+ timePred, preds._2)
+              case Right(otherPred) => (preds._1, preds._2 :+ otherPred)
+            }
+          })
+      case c: RexCall =>
+        // extract time predicate if it exists
+        identifyTimePredicate(c, leftLogicalFieldCnt, inputSchema) match {
+          case Left(timePred) => (Seq[TimePredicate](timePred), Seq[RexNode]())
+          case Right(otherPred) => (Seq[TimePredicate](), Seq[RexNode](otherPred))
+        }
+      case _ =>
+        // No valid window bounds.
+        return (None, Some(predicate))
     }
 
-    if (timePreds.size != 2) {
-      // No valid window bounds. A windowed stream join requires two comparison predicates that
-      // bound the time in both directions.
-      return (None, Some(predicate))
+    timePreds match {
+      case Seq() =>
+        return (None, Some(predicate))
+      case Seq(t) if t.pred.getKind != SqlKind.EQUALS =>
+        // single predicate must be equality predicate
+        return (None, Some(predicate))
+      case s@Seq(_, _) if s.exists(_.pred.getKind == SqlKind.EQUALS) =>
+        // pair of range predicate must not include equals predicate
+        return (None, Some(predicate))
+      case Seq(_) =>
+        // Single equality predicate is OK
+      case Seq(_, _) =>
+        // Two range (i.e., non-equality predicates are OK
+      case _ =>
+        return (None, Some(predicate))
     }
 
     // assemble window bounds from predicates
@@ -108,9 +124,14 @@ object WindowJoinUtil {
     val (leftLowerBound, leftUpperBound) =
       streamTimeOffsets match {
         case Seq(Some(x: WindowBound), Some(y: WindowBound)) if x.isLeftLower && !y.isLeftLower =>
+          // two range predicates
           (x.bound, y.bound)
         case Seq(Some(x: WindowBound), Some(y: WindowBound)) if !x.isLeftLower && y.isLeftLower =>
+          // two range predicates
           (y.bound, x.bound)
+        case Seq(Some(x: WindowBound)) =>
+          // single equality predicate
+          (x.bound, x.bound)
         case _ =>
           // Window join requires two comparison predicate that bound the time in both directions.
           return (None, Some(predicate))
@@ -118,12 +139,12 @@ object WindowJoinUtil {
 
     // compose the remain condition list into one condition
     val remainCondition =
-    otherPreds match {
-      case Seq() =>
-        None
-      case _ =>
-        Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r)))
-    }
+      otherPreds match {
+        case Seq() =>
+          None
+        case _ =>
+          Some(otherPreds.reduceLeft((l, r) => RelOptUtil.andJoinFilters(rexBuilder, l, r)))
+      }
 
     val bounds = if (timePreds.head.leftInputOnLeftSide) {
       Some(WindowBounds(
@@ -146,14 +167,15 @@ object WindowJoinUtil {
 
   /**
     * Analyzes a predicate and identifies whether it is a valid predicate for a window join.
-    * A valid window join predicate is a comparison predicate (<, <=, =>, >) that accesses
-    * time attributes of both inputs, each input on a different side of the condition.
+    *
+    * A valid window join predicate is a range or equality predicate (<, <=, ==, =>, >) that
+    * accesses time attributes of both inputs, each input on a different side of the condition.
     * Both accessed time attributes must be of the same time type, i.e., row-time or proc-time.
     *
     * Examples:
     * - left.rowtime > right.rowtime + 2.minutes => valid
+    * - left.rowtime == right.rowtime => valid
     * - left.proctime < right.rowtime + 2.minutes => invalid: different time type
-    * - left.rowtime == right.rowtime + 2.minutes => invalid: not a comparison predicate
     * - left.rowtime - right.rowtime < 2.minutes => invalid: both time attributes on same side
     *
     * If the predicate is a regular join predicate, i.e., it accesses no time attribute it is
@@ -172,7 +194,8 @@ object WindowJoinUtil {
           case SqlKind.GREATER_THAN |
                SqlKind.GREATER_THAN_OR_EQUAL |
                SqlKind.LESS_THAN |
-               SqlKind.LESS_THAN_OR_EQUAL =>
+               SqlKind.LESS_THAN_OR_EQUAL |
+               SqlKind.EQUALS =>
 
             val leftTerm = c.getOperands.get(0)
             val rightTerm = c.getOperands.get(1)
@@ -235,7 +258,7 @@ object WindowJoinUtil {
     *
     * @return A Seq of all time attribute accessed in the expression.
     */
-  def extractTimeAttributeAccesses(
+  private def extractTimeAttributeAccesses(
       expr: RexNode,
       leftFieldCount: Int,
       inputType: RelDataType): Seq[TimeAttributeAccess] = {
@@ -248,9 +271,9 @@ object WindowJoinUtil {
           case t: TimeIndicatorRelDataType =>
             // time attribute access. Remember time type and side of input
             if (idx < leftFieldCount) {
-              Seq(TimeAttributeAccess(t.isEventTime, true, idx))
+              Seq(TimeAttributeAccess(t.isEventTime, isLeftInput = true, idx))
             } else {
-              Seq(TimeAttributeAccess(t.isEventTime, false, idx - leftFieldCount))
+              Seq(TimeAttributeAccess(t.isEventTime, isLeftInput = false, idx - leftFieldCount))
             }
           case _ =>
             // not a time attribute access.
@@ -272,7 +295,7 @@ object WindowJoinUtil {
     * @param inputType The input type of the expression.
     * @return True, if the expression accesses a non-time attribute. False otherwise.
     */
-  def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
+  private def accessesNonTimeAttribute(expr: RexNode, inputType: RelDataType): Boolean = {
     expr match {
       case i: RexInputRef =>
         val accessedType = inputType.getFieldList.get(i.getIndex).getType
@@ -292,7 +315,7 @@ object WindowJoinUtil {
     *
     * @return window boundary, is left lower bound
     */
-  def computeWindowBoundFromPredicate(
+  private def computeWindowBoundFromPredicate(
       timePred: TimePredicate,
       rexBuilder: RexBuilder,
       config: TableConfig): Option[WindowBound] = {
@@ -303,6 +326,8 @@ object WindowJoinUtil {
           timePred.leftInputOnLeftSide
         case (SqlKind.LESS_THAN | SqlKind.LESS_THAN_OR_EQUAL) =>
           !timePred.leftInputOnLeftSide
+        case (SqlKind.EQUALS) =>
+          true // We don't care about this since there's only one bound value.
         case _ =>
           return None
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 53aff82..ded5c51 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -185,6 +185,66 @@ class JoinTest extends TableTestBase {
   }
 
   @Test
+  def testJoinWithEquiProcTime(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.proctime = t2.proctime
+        |""".stripMargin
+
+    val expected =
+      unaryNode("DataStreamCalc",
+        binaryNode("DataStreamWindowJoin",
+          unaryNode("DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "proctime")
+          ),
+          unaryNode("DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "proctime")
+          ),
+          term("where", "AND(=(a, a0), =(proctime, proctime0))"),
+          term("join", "a", "proctime", "a0", "b", "proctime0"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b0 AS b")
+      )
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testJoinWithEquiRowTime(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT t1.a, t2.b
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.c = t2.c
+        |""".stripMargin
+
+    val expected =
+      unaryNode("DataStreamCalc",
+        binaryNode("DataStreamWindowJoin",
+          unaryNode("DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "c")
+          ),
+          unaryNode("DataStreamCalc",
+            streamTableNode(1),
+            term("select", "a", "b", "c")
+          ),
+          term("where", "AND(=(a, a0), =(c, c0))"),
+          term("join", "a", "c", "a0", "b", "c0"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "b0 AS b")
+      )
+    streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
   def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = {
 
     val sqlQuery =
@@ -332,6 +392,12 @@ class JoinTest extends TableTestBase {
       -10000,
       -5000,
       "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c = t2.c",
+      0,
+      0,
+      "rowtime")
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
index 9cce37e..9f7078c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/JoinValidationTest.scala
@@ -92,4 +92,33 @@ class JoinValidationTest extends TableTestBase {
     streamUtil.verifySql(sql, "n/a")
   }
 
+  /** Validates that range and equality predicate are not accepted **/
+  @Test(expected = classOf[TableException])
+  def testRangeAndEqualityPredicates(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.proctime > t2.proctime - INTERVAL '5' SECOND AND
+        |  t1.proctime = t2.proctime
+        | """.stripMargin
+
+    streamUtil.verifySql(sql, "n/a")
+  }
+
+  /** Validates that equality predicate with offset are not accepted **/
+  @Test(expected = classOf[TableException])
+  def testEqualityPredicateWithOffset(): Unit = {
+    val sql =
+      """
+        |SELECT *
+        |FROM MyTable t1, MyTable2 t2
+        |WHERE t1.a = t2.a AND
+        |  t1.proctime = t2.proctime - INTERVAL '5' SECOND
+        | """.stripMargin
+
+    streamUtil.verifySql(sql, "n/a")
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
index 07e879f..79b413c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
@@ -100,6 +100,40 @@ class JoinTest extends TableTestBase {
     util.verifyTable(resultTable, expected)
   }
 
+  @Test
+  def testProcTimeWindowInnerJoinWithEquiTimeAttrs(): Unit = {
+    val util = streamTestUtil()
+    val left = util.addTable[(Long, Int, String)]('a, 'b, 'c, 'ltime.proctime)
+    val right = util.addTable[(Long, Int, String)]('d, 'e, 'f, 'rtime.proctime)
+
+    val resultTable = left.join(right)
+      .where('a === 'd && 'ltime === 'rtime)
+      .select('a, 'e, 'ltime)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        binaryNode(
+          "DataStreamWindowJoin",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "ltime")
+          ),
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(1),
+            term("select", "d", "e", "rtime")
+          ),
+          term("where", "AND(=(a, d), =(ltime, rtime))"),
+          term("join", "a", "ltime", "d", "e", "rtime"),
+          term("joinType", "InnerJoin")
+        ),
+        term("select", "a", "e", "PROCTIME(ltime) AS ltime")
+      )
+    util.verifyTable(resultTable, expected)
+  }
+
   /**
     * The time indicator can be accessed from non-time predicates now.
     */

http://git-wip-us.apache.org/repos/asf/flink/blob/1b20f70d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index 1d7bab6..85929e8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -187,6 +187,53 @@ class JoinITCase extends StreamingWithStateTestBase {
     StreamITCase.compareWithList(expected)
   }
 
+  /** test rowtime inner join with equi-times **/
+  @Test
+  def testRowTimeInnerJoinWithEquiTimeAttrs(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStateBackend(getStateBackend)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    StreamITCase.clear
+
+    val sqlQuery =
+      """
+        |SELECT t2.key, t2.id, t1.id
+        |FROM T1 as t1 join T2 as t2 ON
+        |  t1.key = t2.key AND
+        |  t2.rt = t1.rt
+        |""".stripMargin
+
+    val data1 = new mutable.MutableList[(Int, Long, String, Long)]
+
+    data1.+=((4, 4000L, "A", 4000L))
+    data1.+=((5, 5000L, "A", 5000L))
+    data1.+=((6, 6000L, "A", 6000L))
+    data1.+=((6, 6000L, "B", 6000L))
+
+    val data2 = new mutable.MutableList[(String, String, Long)]
+    data2.+=(("A", "R-5", 5000L))
+    data2.+=(("B", "R-6", 6000L))
+
+    val t1 = env.fromCollection(data1)
+      .assignTimestampsAndWatermarks(new Row4WatermarkExtractor)
+      .toTable(tEnv, 'id, 'tm, 'key, 'rt.rowtime)
+    val t2 = env.fromCollection(data2)
+      .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+      .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+
+    tEnv.registerTable("T1", t1)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+    val expected = new java.util.ArrayList[String]
+    expected.add("A,R-5,5")
+    expected.add("B,R-6,6")
+    StreamITCase.compareWithList(expected)
+  }
+
   /** test rowtime inner join with other conditions **/
   @Test
   def testRowTimeInnerJoinWithOtherConditions(): Unit = {