You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/07/05 19:49:36 UTC
[1/4] flink git commit: [FLINK-9681] [table] Enforce 5 minute
difference between minRetentionTime and maxRetentionTime.
Repository: flink
Updated Branches:
refs/heads/master ac5572645 -> cc595354e
[FLINK-9681] [table] Enforce 5 minute difference between minRetentionTime and maxRetentionTime.
This closes #6255.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfd0206b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfd0206b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfd0206b
Branch: refs/heads/master
Commit: cfd0206b39b08691b832ea6324e02a5bd3a1533e
Parents: ac55726
Author: hequn8128 <ch...@gmail.com>
Authored: Wed Jul 4 22:11:38 2018 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jul 5 18:06:26 2018 +0200
----------------------------------------------------------------------
docs/dev/table/streaming.md | 18 +++-----
.../flink/table/client/config/Execution.java | 4 +-
.../apache/flink/table/api/queryConfig.scala | 39 ++++++-----------
.../flink/table/api/QueryConfigTest.scala | 45 ++++++++++++++++++++
.../table/runtime/harness/HarnessTestBase.scala | 10 +++++
.../table/runtime/harness/JoinHarnessTest.scala | 4 +-
.../runtime/harness/NonWindowHarnessTest.scala | 2 +-
.../runtime/harness/OverWindowHarnessTest.scala | 10 ++---
.../StateCleaningCountTriggerHarnessTest.scala | 4 +-
...yedProcessFunctionWithCleanupStateTest.scala | 4 +-
.../ProcessFunctionWithCleanupStateTest.scala | 4 +-
11 files changed, 92 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/docs/dev/table/streaming.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/streaming.md b/docs/dev/table/streaming.md
index f8f19c0..d0fea47 100644
--- a/docs/dev/table/streaming.md
+++ b/docs/dev/table/streaming.md
@@ -507,7 +507,7 @@ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
-qConfig.withIdleStateRetentionTime(Time.hours(12));
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
// define query
Table result = ...
@@ -531,7 +531,7 @@ val tableEnv = TableEnvironment.getTableEnvironment(env)
// obtain query configuration from TableEnvironment
val qConfig: StreamQueryConfig = tableEnv.queryConfig
// set query parameters
-qConfig.withIdleStateRetentionTime(Time.hours(12))
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
// define query
val result: Table = ???
@@ -579,10 +579,8 @@ The parameters are specified as follows:
StreamQueryConfig qConfig = ...
-// set idle state retention time: min = 12 hour, max = 16 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16));
-// set idle state retention time. min = max = 12 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12);
+// set idle state retention time: min = 12 hours, max = 24 hours
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));
{% endhighlight %}
</div>
@@ -591,16 +589,14 @@ qConfig.withIdleStateRetentionTime(Time.hours(12);
val qConfig: StreamQueryConfig = ???
-// set idle state retention time: min = 12 hour, max = 16 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16))
-// set idle state retention time. min = max = 12 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12)
+// set idle state retention time: min = 12 hours, max = 24 hours
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
{% endhighlight %}
</div>
</div>
-Configuring different minimum and maximum idle state retention times is more efficient because it reduces the internal book-keeping of a query for when to remove state.
+Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of `minTime` and `maxTime`. The difference between `minTime` and `maxTime` must be at least 5 minutes.
{% top %}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
index 9a5ae47..0d6e6dd 100644
--- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
+++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Execution.java
@@ -75,11 +75,11 @@ public class Execution {
}
public long getMinStateRetention() {
- return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MIN_STATE_RETENTION, Long.toString(Long.MIN_VALUE)));
+ return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MIN_STATE_RETENTION, Long.toString(0)));
}
public long getMaxStateRetention() {
- return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_STATE_RETENTION, Long.toString(Long.MIN_VALUE)));
+ return Long.parseLong(properties.getOrDefault(PropertyStrings.EXECUTION_MAX_STATE_RETENTION, Long.toString(0)));
}
public int getParallelism() {
http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
index 4aa5543..e0cdc05 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
@@ -41,32 +41,13 @@ class StreamQueryConfig private[table] extends QueryConfig {
* The minimum time until state which was not updated will be retained.
* State might be cleared and removed if it was not updated for the defined period of time.
*/
- private var minIdleStateRetentionTime: Long = Long.MinValue
+ private var minIdleStateRetentionTime: Long = 0L
/**
* The maximum time until state which was not updated will be retained.
* State will be cleared and removed if it was not updated for the defined period of time.
*/
- private var maxIdleStateRetentionTime: Long = Long.MinValue
-
- /**
- * Specifies the time interval for how long idle state, i.e., state which was not updated, will
- * be retained. When state was not updated for the specified interval of time, it will be cleared
- * and removed.
- *
- * When new data arrives for previously cleaned-up state, the new data will be handled as if it
- * was the first data. This can result in previous results being overwritten.
- *
- * Note: [[withIdleStateRetentionTime(minTime: Time, maxTime: Time)]] allows to set a minimum and
- * maximum time for state to be retained. This method is more efficient, because the system has
- * to do less bookkeeping to identify the time at which state must be cleared.
- *
- * @param time The time interval for how long idle state is retained. Set to 0 (zero) to never
- * clean-up the state.
- */
- def withIdleStateRetentionTime(time: Time): StreamQueryConfig = {
- withIdleStateRetentionTime(time, time)
- }
+ private var maxIdleStateRetentionTime: Long = 0L
/**
* Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
@@ -79,14 +60,22 @@ class StreamQueryConfig private[table] extends QueryConfig {
*
* Set to 0 (zero) to never clean-up the state.
*
+ * NOTE: Cleaning up state requires additional bookkeeping which becomes less expensive for
+ * larger differences of minTime and maxTime. The difference between minTime and maxTime must be
+ * at least 5 minutes.
+ *
* @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
* never clean-up the state.
- * @param maxTime The maximum time interval for which idle state is retained. May not be smaller
- * than than minTime. Set to 0 (zero) to never clean-up the state.
+ * @param maxTime The maximum time interval for which idle state is retained. Must be at least
+ * 5 minutes greater than minTime. Set to 0 (zero) to never clean-up the state.
*/
def withIdleStateRetentionTime(minTime: Time, maxTime: Time): StreamQueryConfig = {
- if (maxTime.toMilliseconds < minTime.toMilliseconds) {
- throw new IllegalArgumentException("maxTime may not be smaller than minTime.")
+
+ if (maxTime.toMilliseconds - minTime.toMilliseconds < 300000 &&
+ !(maxTime.toMilliseconds == 0 && minTime.toMilliseconds == 0)) {
+ throw new IllegalArgumentException(
+ s"Difference between minTime: ${minTime.toString} and maxTime: ${maxTime.toString} " +
+ s"shoud be at least 5 minutes.")
}
minIdleStateRetentionTime = minTime.toMilliseconds
maxIdleStateRetentionTime = maxTime.toMilliseconds
http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/QueryConfigTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/QueryConfigTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/QueryConfigTest.scala
new file mode 100644
index 0000000..3a23bcf
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/QueryConfigTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.api.common.time.Time
+import org.junit.Test
+
+class QueryConfigTest {
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testMinBiggerThanMax(): Unit = {
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(2), Time.hours(1))
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testMinEqualMax(): Unit = {
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(1), Time.hours(1))
+ }
+
+ @Test
+ def testMinMaxZero(): Unit = {
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(0), Time.hours(0))
+ }
+
+ @Test
+ def testPass(): Unit = {
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.minutes(1), Time.minutes(6))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index 942846c..58fe9d3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.runtime.harness
import java.util.{Comparator, Queue => JQueue}
+import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
@@ -27,6 +28,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
+import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.GeneratedAggregationsFunction
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
@@ -384,4 +386,12 @@ object HarnessTestBase {
value.row.getField(selectorField).asInstanceOf[T]
}
}
+
+ /**
+ * Test class used to test min and max retention time.
+ */
+ class TestStreamQueryConfig(min: Time, max: Time) extends StreamQueryConfig {
+ override def getMinIdleStateRetentionTime: Long = min.toMilliseconds
+ override def getMaxIdleStateRetentionTime: Long = max.toMilliseconds
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
index cc5a1fd..132ac4e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness
import org.apache.flink.table.api.{StreamQueryConfig, Types}
-import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, RowResultSortComparatorWithWatermarks, TupleRowKeySelector}
+import org.apache.flink.table.runtime.harness.HarnessTestBase.{RowResultSortComparator, RowResultSortComparatorWithWatermarks, TestStreamQueryConfig, TupleRowKeySelector}
import org.apache.flink.table.runtime.join._
import org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -44,7 +44,7 @@ import org.junit.Test
class JoinHarnessTest extends HarnessTestBase {
private val queryConfig =
- new StreamQueryConfig().withIdleStateRetentionTime(Time.milliseconds(2), Time.milliseconds(4))
+ new TestStreamQueryConfig(Time.milliseconds(2), Time.milliseconds(4))
private val rowType = Types.ROW(
Types.LONG,
http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
index 5c31cb2..2f4bbfa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
@@ -34,7 +34,7 @@ import org.junit.Test
class NonWindowHarnessTest extends HarnessTestBase {
protected var queryConfig =
- new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+ new TestStreamQueryConfig(Time.seconds(2), Time.seconds(3))
@Test
def testNonWindow(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 218cae2..7ad64c6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -35,7 +35,7 @@ import org.junit.Test
class OverWindowHarnessTest extends HarnessTestBase{
protected var queryConfig: StreamQueryConfig =
- new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+ new TestStreamQueryConfig(Time.seconds(2), Time.seconds(3))
@Test
def testProcTimeBoundedRowsOver(): Unit = {
@@ -368,7 +368,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
minMaxCRowType,
4000,
0,
- new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))))
+ new TestStreamQueryConfig(Time.seconds(1), Time.seconds(2))))
val testHarness =
createHarnessTester(
@@ -518,7 +518,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
minMaxCRowType,
3,
0,
- new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))))
+ new TestStreamQueryConfig(Time.seconds(1), Time.seconds(2))))
val testHarness =
createHarnessTester(
@@ -665,7 +665,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
minMaxAggregationStateType,
minMaxCRowType,
0,
- new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))))
+ new TestStreamQueryConfig(Time.seconds(1), Time.seconds(2))))
val testHarness =
createHarnessTester(
@@ -801,7 +801,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
minMaxAggregationStateType,
minMaxCRowType,
0,
- new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))))
+ new TestStreamQueryConfig(Time.seconds(1), Time.seconds(2))))
val testHarness =
createHarnessTester(
http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
index 93b89ca..7f9c0ef 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/StateCleaningCountTriggerHarnessTest.scala
@@ -22,14 +22,14 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.streaming.runtime.operators.windowing.TriggerTestHarness
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
-import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger
import org.junit.Assert.assertEquals
import org.junit.Test
class StateCleaningCountTriggerHarnessTest {
protected var queryConfig =
- new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3))
+ new TestStreamQueryConfig(Time.seconds(2), Time.seconds(3))
@Test
def testFiringAndFiringWithPurging(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
index c896666..fe90a5f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/KeyedProcessFunctionWithCleanupStateTest.scala
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
import org.apache.flink.util.Collector
import org.junit.Test
@@ -36,8 +37,7 @@ class KeyedProcessFunctionWithCleanupStateTest extends HarnessTestBase {
@Test
def testStateCleaning(): Unit = {
- val queryConfig = new StreamQueryConfig()
- .withIdleStateRetentionTime(Time.milliseconds(5), Time.milliseconds(10))
+ val queryConfig = new TestStreamQueryConfig(Time.milliseconds(5), Time.milliseconds(10))
val func = new MockedKeyedProcessFunction(queryConfig)
val operator = new KeyedProcessOperator(func)
http://git-wip-us.apache.org/repos/asf/flink/blob/cfd0206b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
index e773f4b..519b03f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.runtime.harness.HarnessTestBase.TestStreamQueryConfig
import org.apache.flink.util.Collector
import org.junit.Assert.assertEquals
import org.junit.Test
@@ -36,8 +37,7 @@ class ProcessFunctionWithCleanupStateTest extends HarnessTestBase {
@Test
def testStateCleaning(): Unit = {
- val queryConfig = new StreamQueryConfig()
- .withIdleStateRetentionTime(Time.milliseconds(5), Time.milliseconds(10))
+ val queryConfig = new TestStreamQueryConfig(Time.milliseconds(5), Time.milliseconds(10))
val func = new MockedProcessFunction(queryConfig)
val operator = new LegacyKeyedProcessOperator(func)
[4/4] flink git commit: [FLINK-8094] [table] Extend ExistingField
rowtime extractor to support ISO date strings.
Posted by fh...@apache.org.
[FLINK-8094] [table] Extend ExistingField rowtime extractor to support ISO date strings.
This closes #6253.
- This patch proposes improvement of ExistingField which handles ISO dateformatted
String type as well as Long and Timestamp types.
- Add test code to cover ExistingField's new behavior.
The credit for test code should go to @xccui, since I copied the test method from below commit:
https://github.com/xccui/flink/commit/afcc5f1a0ad92db08294199e61be5df72c1514f8
- Document new behavior of ExistingField to sourceSinks.md.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc595354
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc595354
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc595354
Branch: refs/heads/master
Commit: cc595354e69d4ccb08b5e839095cc50fbe76b0e8
Parents: 5cb080c
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Jul 4 23:24:29 2018 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jul 5 21:48:22 2018 +0200
----------------------------------------------------------------------
docs/dev/table/sourceSinks.md | 3 +-
.../sources/tsextractors/ExistingField.scala | 24 ++++++-------
.../validation/TableSourceValidationTest.scala | 2 +-
.../stream/table/TableSourceITCase.scala | 37 ++++++++++++++++++++
4 files changed, 52 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cc595354/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 792f19d..4c5f2e2 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -460,7 +460,8 @@ val source: KafkaTableSource = Kafka010JsonTableSource.builder()
Flink provides `TimestampExtractor` implementations for common use cases.
The following `TimestampExtractor` implementations are currently available:
-* `ExistingField(fieldName)`: Extracts the value of a rowtime attribute from an existing `LONG` or `SQL_TIMESTAMP` field.
+* `ExistingField(fieldName)`: Extracts the value of a rowtime attribute from an existing `LONG` or `SQL_TIMESTAMP`, or ISO date formatted `STRING` field.
+ * One example of ISO date format would be '2018-05-28 12:34:56.000'.
* `StreamRecordTimestamp()`: Extracts the value of a rowtime attribute from the timestamp of the `DataStream` `StreamRecord`. Note, this `TimestampExtractor` is not available for batch table sources.
A custom `TimestampExtractor` can be defined by implementing the corresponding interface.
http://git-wip-us.apache.org/repos/asf/flink/blob/cc595354/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
index 6a2a418..9b091ee 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
@@ -18,12 +18,13 @@
package org.apache.flink.table.sources.tsextractors
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.table.api.{Types, ValidationException}
import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldReference}
/**
- * Converts an existing [[Long]] or [[java.sql.Timestamp]] field into a rowtime attribute.
+ * Converts an existing [[Long]] or [[java.sql.Timestamp]], or
+ * ISO date formatted [[java.lang.String]] field into a rowtime attribute.
*
* @param field The field to convert into a rowtime attribute.
*/
@@ -32,27 +33,24 @@ final class ExistingField(val field: String) extends TimestampExtractor {
override def getArgumentFields: Array[String] = Array(field)
@throws[ValidationException]
- override def validateArgumentFields(physicalFieldTypes: Array[TypeInformation[_]]): Unit = {
+ override def validateArgumentFields(argumentFieldTypes: Array[TypeInformation[_]]): Unit = {
+ val fieldType = argumentFieldTypes(0)
- // get type of field to convert
- val fieldType = physicalFieldTypes(0)
-
- // check that the field to convert is of type Long or Timestamp
fieldType match {
case Types.LONG => // OK
case Types.SQL_TIMESTAMP => // OK
+ case Types.STRING => // OK
case _: TypeInformation[_] =>
throw ValidationException(
- s"Field '$field' must be of type Long or Timestamp but is of type $fieldType.")
+ s"Field '$field' must be of type Long or Timestamp or String but is of type $fieldType.")
}
}
/**
- * Returns an [[Expression]] that casts a [[Long]] or [[java.sql.Timestamp]] field into a
- * rowtime attribute.
+ * Returns an [[Expression]] that casts a [[Long]] or [[java.sql.Timestamp]], or
+ * ISO date formatted [[java.lang.String]] field into a rowtime attribute.
*/
- def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
-
+ override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = {
val fieldAccess: Expression = fieldAccesses(0)
fieldAccess.resultType match {
@@ -62,6 +60,8 @@ final class ExistingField(val field: String) extends TimestampExtractor {
case Types.SQL_TIMESTAMP =>
// cast timestamp to long
Cast(fieldAccess, Types.LONG)
+ case Types.STRING =>
+ Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cc595354/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
index 5b10eda..2cf3d78 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
@@ -212,7 +212,7 @@ class TableSourceValidationTest {
val schema = new TableSchema(
fieldNames,
Array(Types.LONG, Types.SQL_TIMESTAMP, Types.INT))
- val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "name")
+ val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "amount")
// should fail because configured rowtime field is not of type Long or Timestamp
tEnv.registerTableSource("testTable", ts)
http://git-wip-us.apache.org/repos/asf/flink/blob/cc595354/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
index d1a88b7..ebd74de 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala
@@ -301,6 +301,42 @@ class TableSourceITCase extends AbstractTestBase {
}
@Test
+ def testRowtimeStringTableSource(): Unit = {
+ StreamITCase.testResults = mutable.MutableList()
+ val tableName = "MyTable"
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val data = Seq(
+ "1970-01-01 00:00:00",
+ "1970-01-01 00:00:01",
+ "1970-01-01 00:00:01",
+ "1970-01-01 00:00:02",
+ "1970-01-01 00:00:04")
+
+ val schema = new TableSchema(Array("rtime"), Array(Types.SQL_TIMESTAMP))
+ val returnType = Types.STRING
+
+ val tableSource = new TestTableSourceWithTime(schema, returnType, data, "rtime", null)
+ tEnv.registerTableSource(tableName, tableSource)
+
+ tEnv.scan(tableName)
+ .window(Tumble over 1.second on 'rtime as 'w)
+ .groupBy('w)
+ .select('w.start, 1.count)
+ .addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = Seq(
+ "1970-01-01 00:00:00.0,1",
+ "1970-01-01 00:00:01.0,2",
+ "1970-01-01 00:00:02.0,1",
+ "1970-01-01 00:00:04.0,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
def testProctimeStringTableSource(): Unit = {
StreamITCase.testResults = mutable.MutableList()
val tableName = "MyTable"
@@ -741,4 +777,5 @@ class TableSourceITCase extends AbstractTestBase {
val expected = Seq("(1,A,1)", "(6,C,10)", "(6,D,20)")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
}
[3/4] flink git commit: [FLINK-9742] [table] Add helper method to
access private Expression.resultType.
Posted by fh...@apache.org.
[FLINK-9742] [table] Add helper method to access private Expression.resultType.
This closes #6252.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5cb080cd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5cb080cd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5cb080cd
Branch: refs/heads/master
Commit: 5cb080cd785658fcb817a00f51e12d6fcbc78b33
Parents: 84fbbfe
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Jul 5 21:57:46 2018 +0900
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jul 5 21:48:21 2018 +0200
----------------------------------------------------------------------
.../apache/flink/table/expressions/ExpressionUtils.scala | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5cb080cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
index 013c8ac..e794d41 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
@@ -22,13 +22,22 @@ import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float =
import java.math.{BigDecimal => JBigDecimal}
import java.sql.{Date, Time, Timestamp}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
object ExpressionUtils {
+ /**
+ * Retrieve result type of given Expression.
+ *
+ * @param expr The expression which caller is interested about result type
+ * @return The result type of Expression
+ */
+ def getResultType(expr: Expression): TypeInformation[_] = {
+ expr.resultType
+ }
private[flink] def isTimeIntervalLiteral(expr: Expression): Boolean = expr match {
case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) => true
[2/4] flink git commit: [FLINK-9581] [docs] Remove white spaces to
align COLLECT code example.
Posted by fh...@apache.org.
[FLINK-9581] [docs] Remove white spaces to align COLLECT code example.
This closes #6161
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84fbbfe1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84fbbfe1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84fbbfe1
Branch: refs/heads/master
Commit: 84fbbfe1258c6c9c9aed919946f9652f7198f96b
Parents: cfd0206
Author: snuyanzin <sn...@gmail.com>
Authored: Wed Jun 13 20:30:26 2018 +0300
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Jul 5 21:48:21 2018 +0200
----------------------------------------------------------------------
docs/dev/table/sql.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/84fbbfe1/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 1eff326..57e0ba5 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -2327,8 +2327,8 @@ VAR_SAMP(value)
<tr>
<td>
{% highlight text %}
- COLLECT(value)
- {% endhighlight %}
+COLLECT(value)
+{% endhighlight %}
</td>
<td>
<p>Returns a multiset of the <i>value</i>s. null input <i>value</i> will be ignored. Return an empty multiset if only null values are added. </p>