You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/07/05 19:49:36 UTC

[1/4] flink git commit: [FLINK-9681] [table] Enforce 5 minute difference between minRetentionTime and maxRetentionTime.

Repository: flink
Updated Branches:
  refs/heads/master ac5572645 -> cc595354e


[FLINK-9681] [table] Enforce 5 minute difference between minRetentionTime and maxRetentionTime.

This closes #6255.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfd0206b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfd0206b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfd0206b

Branch: refs/heads/master
Commit: cfd0206b39b08691b832ea6324e02a5bd3a1533e
Parents: ac55726
Author: hequn8128 <ch...@gmail.com>
Authored: Wed Jul 4 22:11:38 2018 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jul 5 18:06:26 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/streaming.md                     | 18 +++-----
 .../flink/table/client/config/Execution.java    |  4 +-
 .../apache/flink/table/api/queryConfig.scala    | 39 ++++++-----------
 .../flink/table/api/QueryConfigTest.scala       | 45 ++++++++++++++++++++
 .../table/runtime/harness/HarnessTestBase.scala | 10 +++++
 .../table/runtime/harness/JoinHarnessTest.scala |  4 +-
 .../runtime/harness/NonWindowHarnessTest.scala  |  2 +-
 .../runtime/harness/OverWindowHarnessTest.scala | 10 ++---
 .../StateCleaningCountTriggerHarnessTest.scala  |  4 +-
 ...yedProcessFunctionWithCleanupStateTest.scala |  4 +-
 .../ProcessFunctionWithCleanupStateTest.scala   |  4 +-
 11 files changed, 92 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/docs/dev/table/streaming.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/streaming.md b/docs/dev/table/streaming.md
index f8f19c0..d0fea47 100644
--- a/docs/dev/table/streaming.md
+++ b/docs/dev/table/streaming.md
@@ -507,7 +507,7 @@ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 // obtain query configuration from TableEnvironment
 StreamQueryConfig qConfig = tableEnv.queryConfig();
 // set query parameters
-qConfig.withIdleStateRetentionTime(Time.hours(12));
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
 
 // define query
 Table result = ...
@@ -531,7 +531,7 @@ val tableEnv = TableEnvironment.getTableEnvironment(env)
 // obtain query configuration from TableEnvironment
 val qConfig: StreamQueryConfig = tableEnv.queryConfig
 // set query parameters
-qConfig.withIdleStateRetentionTime(Time.hours(12))
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
 
 // define query
 val result: Table = ???
@@ -579,10 +579,8 @@ The parameters are specified as follows:
 
 StreamQueryConfig qConfig = ...
 
-// set idle state retention time: min = 12 hour, max = 16 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16));
-// set idle state retention time. min = max = 12 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12);
+// set idle state retention time: min = 12 hours, max = 24 hours
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
 
 {% endhighlight %}
 </div>
@@ -591,16 +589,14 @@ qConfig.withIdleStateRetentionTime(Time.hours(12);
 
 val qConfig: StreamQueryConfig = ???
 
-// set idle state retention time: min = 12 hour, max = 16 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16))
-// set idle state retention time. min = max = 12 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12)
+// set idle state retention time: min = 12 hours, max = 24 hours
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
 
 {% endhighlight %}
 </div>
 </div>
 
-Configuring different minimum and maximum idle state retention times is more efficient because it reduces the internal book-keeping of a query for when to remove state.
+Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of `minTime` and `maxTime`. The difference between `minTime` and `maxTime` must be at least 5 minutes.
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
index 9a5ae47..0d6e6dd 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -75,11 +75,11 @@ public class Execution {
 	}
 
 	public long getMinStateRetention() {
-		return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MIN_STATE_RETENTION, Long.toString(Long.MIN_VALUE)));
+		return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MIN_STATE_RETENTION, Long.toString(0)));
 	}
 
 	public long getMaxStateRetention() {
-		return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_STATE_RETENTION, Long.toString(Long.MIN_VALUE)));
+		return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_STATE_RETENTION, Long.toString(0)));
 	}
 
 	public int getParallelism() {

http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
index 4aa5543..e0cdc05 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
@@ -41,32 +41,13 @@ class StreamQueryConfig private[table] extends QueryConfig {
     * The minimum time until state which was not updated will be retained.
     * State might be cleared and removed if it was not updated for the defined period of time.
     */
-  private var minIdleStateRetentionTime: Long = Long.MinValue
+  private var minIdleStateRetentionTime: Long = 0L
 
   /**
     * The maximum time until state which was not updated will be retained.
     * State will be cleared and removed if it was not updated for the defined period of time.
     */
-  private var maxIdleStateRetentionTime: Long = Long.MinValue
-
-  /**
-    * Specifies the time interval for how long idle state, i.e., state which was not updated, will
-    * be retained. When state was not updated for the specified interval of time, it will be cleared
-    * and removed.
-    *
-    * When new data arrives for previously cleaned-up state, the new data will be handled as if it
-    * was the first data. This can result in previous results being overwritten.
-    *
-    * Note: [[withIdleStateRetentionTime(minTime: Time, maxTime: Time)]] allows to set a minimum and
-    * maximum time for state to be retained. This method is more efficient, because the system has
-    * to do less bookkeeping to identify the time at which state must be cleared.
-    *
-    * @param time The time interval for how long idle state is retained. Set to 0 (zero) to never
-    *             clean-up the state.
-    */
-  def withIdleStateRetentionTime(time: Time): StreamQueryConfig = {
-    withIdleStateRetentionTime(time, time)
-  }
+  private var maxIdleStateRetentionTime: Long = 0L
 
   /**
     * Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
@@ -79,14 +60,22 @@ class StreamQueryConfig private[table] extends QueryConfig {
     *
     * Set to 0 (zero) to never clean-up the state.
     *
+    * NOTE: Cleaning up state requires additional bookkeeping which becomes less expensive for
+    * larger differences of minTime and maxTime. The difference between minTime and maxTime must be
+    * at least 5 minutes.
+    *
     * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
     *                never clean-up the state.
-    * @param maxTime The maximum time interval for which idle state is retained. May not be smaller
-    *                than than minTime. Set to 0 (zero) to never clean-up the state.
+    * @param maxTime The maximum time interval for which idle state is retained. Must be at least
+    *                5 minutes greater than minTime. Set to 0 (zero) to never clean-up the state.
     */
   def withIdleStateRetentionTime(minTime: Time, maxTime: Time): StreamQueryConfig = {
-    if (maxTime.toMilliseconds < minTime.toMilliseconds) {
-      throw new IllegalArgumentException("maxTime may not be smaller than minTime.")
+
+    if (maxTime.toMilliseconds - minTime.toMilliseconds < 300000 &&
+      !(maxTime.toMilliseconds == 0 && minTime.toMilliseconds == 0)) {
+      throw new IllegalArgumentException(
+        s"Difference between minTime: ${minTime.toString} and maxTime: ${maxTime.toString} " +
+          s"shoud be at least 5 minutes.")
     }
     minIdleStateRetentionTime = minTime.toMilliseconds
     maxIdleStateRetentionTime = maxTime.toMilliseconds

http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/QueryConfigTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/QueryConfigTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/QueryConfigTest.scala
new file mode 100644
index 0000000..3a23bcf
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/QueryConfigTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.api.common.time.Time
+import org.junit.Test
+
+class QueryConfigTest {
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testMinBiggerThanMax(): Unit = {
+    new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(2), Time.hours(1))
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testMinEqualMax(): Unit = {
+    new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(1), Time.hours(1))
+  }
+
+  @Test
+  def testMinMaxZero(): Unit = {
+    new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(0), Time.hours(0))
+  }
+
+  @Test
+  def testPass(): Unit = {
+    new StreamQueryConfig().withIdleStateRetentionTime(Time.minutes(1), Time.minutes(6))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index 942846c..58fe9d3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.runtime.harness
 
 import java.util.{Comparator, Queue => JQueue}
 
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
@@ -27,6 +28,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
+import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.GeneratedAggregationsFunction
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
@@ -384,4 +386,12 @@ object HarnessTestBase {
       value.row.getField(selectorField).asInstanceOf[T]
     }
   }
+
+  /**
+    * Test class used to test min and max retention time.
+    */
+  class TestStreamQueryConfig(min: Time, max: Time) extends StreamQueryConfig {
+    override def getMinIdleStateRetentionTime: Long = min.toMilliseconds
+    override def getMaxIdleStateRetentionTime: Long = max.toMilliseconds
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index cc5a1fd..132ac4e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
 import org.apache.flink.table.api.{StreamQueryConfig, Types}
-import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, RowResultSortComparatorWithWatermarks, TupleRowKeySelector}
+import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, RowResultSortComparatorWithWatermarks, TestStreamQueryConfig, TupleRowKeySelector}
 import org.apache.flink.table.runtime.join._
 import org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -44,7 +44,7 @@ import org.junit.Test
 class JoinHarnessTest extends HarnessTestBase {
 
   private val queryConfig =
-    new StreamQueryConfig().withIdleStateRetentionTime(Time.milliseconds(2), Time.milliseconds(4))
+    new TestStreamQueryConfig(Time.milliseconds(2), Time.milliseconds(4))
 
   private val rowType = Types.ROW(
     Types.LONG,

http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
index 5c31cb2..2f4bbfa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
@@ -34,7 +34,7 @@ import org.junit.Test
 class NonWindowHarnessTest extends HarnessTestBase {
 
   protected var queryConfig =
-    new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+    new TestStreamQueryConfig(Time.seconds(2), Time.seconds(3))
 
   @Test
   def testNonWindow(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 218cae2..7ad64c6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -35,7 +35,7 @@ import org.junit.Test
 class OverWindowHarnessTest extends HarnessTestBase{
 
   protected var queryConfig: StreamQueryConfig =
-    new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+    new TestStreamQueryConfig(Time.seconds(2), Time.seconds(3))
 
   @Test
   def testProcTimeBoundedRowsOver(): Unit = {
@@ -368,7 +368,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
         minMaxCRowType,
         4000,
         0,
-        new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))))
+        new TestStreamQueryConfig(Time.seconds(1), Time.seconds(2))))
 
     val testHarness =
       createHarnessTester(
@@ -518,7 +518,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
         minMaxCRowType,
         3,
         0,
-        new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))))
+        new TestStreamQueryConfig(Time.seconds(1), Time.seconds(2))))
 
     val testHarness =
       createHarnessTester(
@@ -665,7 +665,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
         minMaxAggregationStateType,
         minMaxCRowType,
         0,
-        new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))))
+        new TestStreamQueryConfig(Time.seconds(1), Time.seconds(2))))
 
     val testHarness =
       createHarnessTester(
@@ -801,7 +801,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
         minMaxAggregationStateType,
         minMaxCRowType,
         0,
-        new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))))
+        new TestStreamQueryConfig(Time.seconds(1), Time.seconds(2))))
 
     val testHarness =
       createHarnessTester(

http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
index 93b89ca..7f9c0ef 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
@@ -22,14 +22,14 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
 import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
-import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
 import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger
 import org.junit.Assert.assertEquals
 import org.junit.Test
 
 class StateCleaningCountTriggerHarnessTest {
   protected var queryConfig =
-    new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+    new TestStreamQueryConfig(Time.seconds(2), Time.seconds(3))
 
   @Test
   def testFiringAndFiringWithPurging(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
index c896666..fe90a5f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.operators.KeyedProcessOperator
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
 import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
 import org.apache.flink.util.Collector
 
 import org.junit.Test
@@ -36,8 +37,7 @@ class KeyedProcessFunctionWithCleanupStateTest extends HarnessTestBase {
 
   @Test
   def testStateCleaning(): Unit = {
-    val queryConfig = new StreamQueryConfig()
-      .withIdleStateRetentionTime(Time.milliseconds(5), Time.milliseconds(10))
+    val queryConfig = new TestStreamQueryConfig(Time.milliseconds(5), Time.milliseconds(10))
 
     val func = new MockedKeyedProcessFunction(queryConfig)
     val operator = new KeyedProcessOperator(func)

http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
index e773f4b..519b03f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
 import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
 import org.apache.flink.util.Collector
 import org.junit.Assert.assertEquals
 import org.junit.Test
@@ -36,8 +37,7 @@ class ProcessFunctionWithCleanupStateTest extends HarnessTestBase {
 
   @Test
   def testStateCleaning(): Unit = {
-    val queryConfig = new StreamQueryConfig()
-      .withIdleStateRetentionTime(Time.milliseconds(5), Time.milliseconds(10))
+    val queryConfig = new TestStreamQueryConfig(Time.milliseconds(5), Time.milliseconds(10))
 
     val func = new MockedProcessFunction(queryConfig)
     val operator = new LegacyKeyedProcessOperator(func)


[4/4] flink git commit: [FLINK-8094] [table] Extend ExistingField rowtime extractor to support ISO date strings.

Posted by fh...@apache.org.
[FLINK-8094] [table] Extend ExistingField rowtime extractor to support ISO date strings.

This closes #6253.

- This patch proposes improvement of ExistingField which handles ISO dateformatted
  String type as well as Long and Timestamp types.
- Add test code to cover ExistingField's new behavior.
  The credit for test code should go to @xccui, since I copied the test method from below commit:
  https://github.com/xccui/flink/commit/afcc5f1a0ad92db08294199e61be5df72c1514f8
- Document new behavior of ExistingField to sourceSinks.md.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc595354
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc595354
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc595354

Branch: refs/heads/master
Commit: cc595354e69d4ccb08b5e839095cc50fbe76b0e8
Parents: 5cb080c
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Jul 4 23:24:29 2018 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jul 5 21:48:22 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   |  3 +-
 .../sources/tsextractors/ExistingField.scala    | 24 ++++++-------
 .../validation/TableSourceValidationTest.scala  |  2 +-
 .../stream/table/TableSourceITCase.scala        | 37 ++++++++++++++++++++
 4 files changed, 52 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cc595354/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 792f19d..4c5f2e2 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -460,7 +460,8 @@ val source: KafkaTableSource = Kafka010JsonTableSource.builder()
 Flink provides `TimestampExtractor` implementations for common use cases.
 The following `TimestampExtractor` implementations are currently available:
 
-* `ExistingField(fieldName)`: Extracts the value of a rowtime attribute from an existing `LONG` or `SQL_TIMESTAMP` field.
+* `ExistingField(fieldName)`: Extracts the value of a rowtime attribute from an existing `LONG` or `SQL_TIMESTAMP`, or ISO date formatted `STRING` field.
+  * One example of ISO date format would be '2018-05-28 12:34:56.000'.
 * `StreamRecordTimestamp()`: Extracts the value of a rowtime attribute from the timestamp of the `DataStream` `StreamRecord`. Note, this `TimestampExtractor` is not available for batch table sources.
 
 A custom `TimestampExtractor` can be defined by implementing the corresponding interface.

http://git-wip-us.apache.org/repos/asf/flink/blob/cc595354/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
index 6a2a418..9b091ee 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.table.sources.tsextractors
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.api.{Types, ValidationException}
 import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldReference}
 
 /**
-  * Converts an existing [[Long]] or [[java.sql.Timestamp]] field into a rowtime attribute.
+  * Converts an existing [[Long]] or [[java.sql.Timestamp]], or
+  * ISO date formatted [[java.lang.String]] field into a rowtime attribute.
   *
   * @param field The field to convert into a rowtime attribute.
   */
@@ -32,27 +33,24 @@ final class ExistingField(val field: String) extends TimestampExtractor {
   override def getArgumentFields: Array[String] = Array(field)
 
   @throws[ValidationException]
-  override def validateArgumentFields(physicalFieldTypes: Array[TypeInformation[_]]): Unit = {
+  override def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit = {
+    val fieldType = argumentFieldTypes(0)
 
-    // get type of field to convert
-    val fieldType = physicalFieldTypes(0)
-
-    // check that the field to convert is of type Long or Timestamp
     fieldType match {
       case Types.LONG => // OK
       case Types.SQL_TIMESTAMP => // OK
+      case Types.STRING => // OK
       case _: TypeInformation[_] =>
         throw ValidationException(
-          s"Field '$field' must be of type Long or Timestamp but is of type $fieldType.")
+          s"Field '$field' must be of type Long or Timestamp or String but is of type $fieldType.")
     }
   }
 
   /**
-    * Returns an [[Expression]] that casts a [[Long]] or [[java.sql.Timestamp]] field into a
-    * rowtime attribute.
+    * Returns an [[Expression]] that casts a [[Long]] or [[java.sql.Timestamp]], or
+    * ISO date formatted [[java.lang.String]] field into a rowtime attribute.
     */
-  def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
-
+  override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
     val fieldAccess: Expression = fieldAccesses(0)
 
     fieldAccess.resultType match {
@@ -62,6 +60,8 @@ final class ExistingField(val field: String) extends TimestampExtractor {
       case Types.SQL_TIMESTAMP =>
         // cast timestamp to long
         Cast(fieldAccess, Types.LONG)
+      case Types.STRING =>
+        Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cc595354/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
index 5b10eda..2cf3d78 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
@@ -212,7 +212,7 @@ class TableSourceValidationTest {
     val schema = new TableSchema(
       fieldNames,
       Array(Types.LONG, Types.SQL_TIMESTAMP, Types.INT))
-    val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "name")
+    val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "amount")
 
     // should fail because configured rowtime field is not of type Long or Timestamp
     tEnv.registerTableSource("testTable", ts)

http://git-wip-us.apache.org/repos/asf/flink/blob/cc595354/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
index d1a88b7..ebd74de 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
@@ -301,6 +301,42 @@ class TableSourceITCase extends AbstractTestBase {
   }
 
   @Test
+  def testRowtimeStringTableSource(): Unit = {
+    StreamITCase.testResults = mutable.MutableList()
+    val tableName = "MyTable"
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      "1970-01-01 00:00:00",
+      "1970-01-01 00:00:01",
+      "1970-01-01 00:00:01",
+      "1970-01-01 00:00:02",
+      "1970-01-01 00:00:04")
+
+    val schema = new TableSchema(Array("rtime"), Array(Types.SQL_TIMESTAMP))
+    val returnType = Types.STRING
+
+    val tableSource = new TestTableSourceWithTime(schema, returnType, data, "rtime", null)
+    tEnv.registerTableSource(tableName, tableSource)
+
+    tEnv.scan(tableName)
+      .window(Tumble over 1.second on 'rtime as 'w)
+      .groupBy('w)
+      .select('w.start, 1.count)
+      .addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.0,1",
+      "1970-01-01 00:00:01.0,2",
+      "1970-01-01 00:00:02.0,1",
+      "1970-01-01 00:00:04.0,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
   def testProctimeStringTableSource(): Unit = {
     StreamITCase.testResults = mutable.MutableList()
     val tableName = "MyTable"
@@ -741,4 +777,5 @@ class TableSourceITCase extends AbstractTestBase {
     val expected = Seq("(1,A,1)", "(6,C,10)", "(6,D,20)")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
 }


[3/4] flink git commit: [FLINK-9742] [table] Add helper method to access private Expression.resultType.

Posted by fh...@apache.org.
[FLINK-9742] [table] Add helper method to access private Expression.resultType.

This closes #6252.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5cb080cd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5cb080cd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5cb080cd

Branch: refs/heads/master
Commit: 5cb080cd785658fcb817a00f51e12d6fcbc78b33
Parents: 84fbbfe
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Jul 5 21:57:46 2018 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jul 5 21:48:21 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/table/expressions/ExpressionUtils.scala | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5cb080cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
index 013c8ac..e794d41 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
@@ -22,13 +22,22 @@ import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float =
 import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Time, Timestamp}
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
 
 object ExpressionUtils {
+  /**
+    * Retrieve result type of given Expression.
+    *
+    * @param expr The expression which caller is interested about result type
+    * @return     The result type of Expression
+    */
+  def getResultType(expr: Expression): TypeInformation[_] = {
+    expr.resultType
+  }
 
   private[flink] def isTimeIntervalLiteral(expr: Expression): Boolean = expr match {
     case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true


[2/4] flink git commit: [FLINK-9581] [docs] Remove white spaces to align COLLECT code example.

Posted by fh...@apache.org.
[FLINK-9581] [docs] Remove white spaces to align COLLECT code example.

This closes #6161


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84fbbfe1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84fbbfe1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84fbbfe1

Branch: refs/heads/master
Commit: 84fbbfe1258c6c9c9aed919946f9652f7198f96b
Parents: cfd0206
Author: snuyanzin <sn...@gmail.com>
Authored: Wed Jun 13 20:30:26 2018 +0300
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jul 5 21:48:21 2018 +0200

----------------------------------------------------------------------
 docs/dev/table/sql.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84fbbfe1/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 1eff326..57e0ba5 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -2327,8 +2327,8 @@ VAR_SAMP(value)
     <tr>
       <td>
           {% highlight text %}
-          COLLECT(value)
-          {% endhighlight %}
+COLLECT(value)
+{% endhighlight %}
       </td>
       <td>
           <p>Returns a multiset of the <i>value</i>s. null input <i>value</i> will be ignored. Return an empty multiset if only null values are added. </p>