You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/27 08:19:05 UTC

[GitHub] [flink] wuchong commented on a change in pull request #12880: [FLINK-18555][table sql/api] Make TableConfig options can be configur…

wuchong commented on a change in pull request #12880:
URL: https://github.com/apache/flink/pull/12880#discussion_r460697474



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
##########
@@ -51,4 +52,24 @@ private TableConfigOptions() {}
 			.withDescription("The SQL dialect defines how to parse a SQL query. " +
 					"A different SQL dialect may support different SQL grammar. " +
 					"Currently supported dialects are: default and hive");
+
+	@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+	public static final ConfigOption<String> LOCAL_TIME_ZONE = key("table.local-time-zone")
+			.stringType()
+			// special value to decide whether to use ZoneId.systemDefault() in TableConfig.getLocalTimeZone()
+			.defaultValue("System")

Review comment:
       What about to use `(system-default)` here? It will be more like a system default value like `(none)`.  
   Besides, please store the "(system-default)" in a static final variable. 

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time maxTime) {
 		}
 		minIdleStateRetentionTime = minTime.toMilliseconds();
 		maxIdleStateRetentionTime = maxTime.toMilliseconds();

Review comment:
       The member variables `minIdleStateRetentionTime`, `maxIdleStateRetentionTime` can be removed? 

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time maxTime) {
 		}
 		minIdleStateRetentionTime = minTime.toMilliseconds();
 		maxIdleStateRetentionTime = maxTime.toMilliseconds();
+		setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+	}
+
+	/**
+	 * Specifies a retention time interval for how long idle state, i.e., state which
+	 * was not updated, will be retained.
+	 * State will never be cleared until it was idle for less than the retention time and will never
+	 * be kept if it was idle for more than the 1.5 * retention time.
+	 *
+	 * <p>When new data arrives for previously cleaned-up state, the new data will be handled as if it
+	 * was the first data. This can result in previous results being overwritten.
+	 *
+	 * <p>Set to 0 (zero) to never clean-up the state.
+	 *
+	 * <p>NOTE: Cleaning up state requires additional bookkeeping which becomes less expensive for
+	 * larger differences of minTime and maxTime. The difference between minTime and maxTime must be
+	 * at least 5 minutes.
+	 *
+	 * @param duration The retention time interval for which idle state is retained. Set to 0 (zero) to
+	 *                never clean-up the state.
+	 */
+	public void setIdleStateRetentionDuration(Duration duration){
+		configuration.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, duration);
 	}
 
 	/**
 	 * @return The minimum time until state which was not updated will be retained.
 	 */
 	public long getMinIdleStateRetentionTime() {

Review comment:
       Add a new method `getIdleStateRetentionTime` and deprecate `getMinIdleStateRetentionTime` and `getMaxIdleStateRetentionTime`.

##########
File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TableConfig}.
+ */
+public class TableConfigTest {
+	@Test
+	public void testGetAndSetTableConfigOptions() throws Exception{
+		Class<?> configClass = TableConfig.class;
+		TableConfig config = new TableConfig();
+		for (TestSpec<?> spec: testSpecList){
+			configClass.getMethod("set" + spec.fieldName, spec.inputClass).invoke(config, spec.inputValue);
+			assertEquals(spec.expectedValue, configClass.getMethod("get" + spec.fieldName).invoke(config));
+		}
+	}
+
+	@Test
+	public void testGetAndSetIdleStateRetentionDuration(){
+		TableConfig config = new TableConfig();
+		config.setIdleStateRetentionDuration(Duration.ofHours(1));
+		assertEquals(Duration.ofHours(1).toMillis(), config.getMinIdleStateRetentionTime());
+		assertEquals(Duration.ofHours(1).toMillis() * 3 / 2, config.getMaxIdleStateRetentionTime());

Review comment:
       Please also test the string option. 

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -49,6 +51,15 @@
 				"tasks to advance their watermarks without the need to wait for " +
 				"watermarks from this source while it is idle.");
 
+	@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+	public static final ConfigOption<Duration> IDLE_STATE_RETENTION =
+		key("table.exec.state.ttl")
+			.durationType()
+			.defaultValue(Duration.ofMillis(0))
+			.withDescription("A time-to-live (TTL) can be assigned to the keyed state of any type. " +
+				"If a TTL is configured and a state value has expired, " +
+				"the stored value will be cleaned up on a best effort basis.");
+

Review comment:
       Add description for default value, for example: `Default is 0 (zero) which will never clean-up state`. 

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -274,9 +263,10 @@ public void setMaxGeneratedCodeLength(Integer maxGeneratedCodeLength) {
 	 *
 	 * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
 	 *                never clean-up the state.
-	 * @param maxTime The maximum time interval for which idle state is retained. Must be at least
-	 *                5 minutes greater than minTime. Set to 0 (zero) to never clean-up the state.
+	 * @param maxTime Currently maxTime will be ignored and it will automatically derived from minTime
+	 *                as 1.5 x minTime.
 	 */
+	@Deprecated

Review comment:
       Add deprecate javadoc on this method, and suggest to use which method instead. 

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -49,6 +51,15 @@
 				"tasks to advance their watermarks without the need to wait for " +
 				"watermarks from this source while it is idle.");
 
+	@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+	public static final ConfigOption<Duration> IDLE_STATE_RETENTION =
+		key("table.exec.state.ttl")
+			.durationType()
+			.defaultValue(Duration.ofMillis(0))
+			.withDescription("A time-to-live (TTL) can be assigned to the keyed state of any type. " +
+				"If a TTL is configured and a state value has expired, " +
+				"the stored value will be cleaned up on a best effort basis.");
+

Review comment:
       Move this before `Source Options`? And add comment `State Options` on it?

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time maxTime) {
 		}
 		minIdleStateRetentionTime = minTime.toMilliseconds();
 		maxIdleStateRetentionTime = maxTime.toMilliseconds();
+		setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+	}
+
+	/**
+	 * Specifies a retention time interval for how long idle state, i.e., state which
+	 * was not updated, will be retained.
+	 * State will never be cleared until it was idle for less than the retention time and will never
+	 * be kept if it was idle for more than the 1.5 * retention time.
+	 *
+	 * <p>When new data arrives for previously cleaned-up state, the new data will be handled as if it
+	 * was the first data. This can result in previous results being overwritten.
+	 *
+	 * <p>Set to 0 (zero) to never clean-up the state.
+	 *
+	 * <p>NOTE: Cleaning up state requires additional bookkeeping which becomes less expensive for
+	 * larger differences of minTime and maxTime. The difference between minTime and maxTime must be
+	 * at least 5 minutes.
+	 *
+	 * @param duration The retention time interval for which idle state is retained. Set to 0 (zero) to
+	 *                never clean-up the state.
+	 */
+	public void setIdleStateRetentionDuration(Duration duration){

Review comment:
       I prefer to use the same method as before `setIdleStateRetentionTime` or without the `Time`/`Duration` suffix. 

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time maxTime) {
 		}
 		minIdleStateRetentionTime = minTime.toMilliseconds();
 		maxIdleStateRetentionTime = maxTime.toMilliseconds();
+		setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+	}
+
+	/**
+	 * Specifies a retention time interval for how long idle state, i.e., state which
+	 * was not updated, will be retained.
+	 * State will never be cleared until it was idle for less than the retention time and will never
+	 * be kept if it was idle for more than the 1.5 * retention time.
+	 *
+	 * <p>When new data arrives for previously cleaned-up state, the new data will be handled as if it
+	 * was the first data. This can result in previous results being overwritten.
+	 *
+	 * <p>Set to 0 (zero) to never clean-up the state.
+	 *
+	 * <p>NOTE: Cleaning up state requires additional bookkeeping which becomes less expensive for
+	 * larger differences of minTime and maxTime. The difference between minTime and maxTime must be
+	 * at least 5 minutes.

Review comment:
       Remove the NOTE.

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time maxTime) {
 		}
 		minIdleStateRetentionTime = minTime.toMilliseconds();
 		maxIdleStateRetentionTime = maxTime.toMilliseconds();
+		setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+	}
+
+	/**
+	 * Specifies a retention time interval for how long idle state, i.e., state which
+	 * was not updated, will be retained.
+	 * State will never be cleared until it was idle for less than the retention time and will never
+	 * be kept if it was idle for more than the 1.5 * retention time.

Review comment:
       ```suggestion
   	 * State will never be cleared until it was idle for less than the retention time and will be cleared on a best effort basis after the retention time.
   ```

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -274,9 +263,10 @@ public void setMaxGeneratedCodeLength(Integer maxGeneratedCodeLength) {
 	 *
 	 * @param minTime The minimum time interval for which idle state is retained. Set to 0 (zero) to
 	 *                never clean-up the state.
-	 * @param maxTime The maximum time interval for which idle state is retained. Must be at least
-	 *                5 minutes greater than minTime. Set to 0 (zero) to never clean-up the state.
+	 * @param maxTime Currently maxTime will be ignored and it will automatically derived from minTime

Review comment:
       I would suggest to keep the original description for `maxTime`, but add a NOTE to explain the `maxTime` is ignored now and is inferred from `minTime` with a 1.5 multiplier. 

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
##########
@@ -286,20 +276,43 @@ public void setIdleStateRetentionTime(Time minTime, Time maxTime) {
 		}
 		minIdleStateRetentionTime = minTime.toMilliseconds();
 		maxIdleStateRetentionTime = maxTime.toMilliseconds();
+		setIdleStateRetentionDuration(Duration.ofMillis(minIdleStateRetentionTime));
+	}
+
+	/**
+	 * Specifies a retention time interval for how long idle state, i.e., state which
+	 * was not updated, will be retained.
+	 * State will never be cleared until it was idle for less than the retention time and will never
+	 * be kept if it was idle for more than the 1.5 * retention time.
+	 *
+	 * <p>When new data arrives for previously cleaned-up state, the new data will be handled as if it
+	 * was the first data. This can result in previous results being overwritten.
+	 *
+	 * <p>Set to 0 (zero) to never clean-up the state.
+	 *
+	 * <p>NOTE: Cleaning up state requires additional bookkeeping which becomes less expensive for
+	 * larger differences of minTime and maxTime. The difference between minTime and maxTime must be
+	 * at least 5 minutes.
+	 *
+	 * @param duration The retention time interval for which idle state is retained. Set to 0 (zero) to
+	 *                never clean-up the state.

Review comment:
       Add a `@see` comment:
   
   ```
   * @see org.apache.flink.api.common.state.StateTtlConfig
   ```
   
   

##########
File path: flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
##########
@@ -63,7 +63,7 @@ public void testAppendStreamDoesNotOverwriteTableConfig() {
 			equalTo(minRetention.toMilliseconds()));
 		assertThat(
 			tEnv.getConfig().getMaxIdleStateRetentionTime(),
-			equalTo(maxRetention.toMilliseconds()));
+			equalTo(minRetention.toMilliseconds() * 3 / 2));

Review comment:
       Please use the new method `setIdleStateRetentionTime` to set idle state in tests. 

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
##########
@@ -51,4 +52,24 @@ private TableConfigOptions() {}
 			.withDescription("The SQL dialect defines how to parse a SQL query. " +
 					"A different SQL dialect may support different SQL grammar. " +
 					"Currently supported dialects are: default and hive");
+
+	@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+	public static final ConfigOption<String> LOCAL_TIME_ZONE = key("table.local-time-zone")
+			.stringType()
+			// special value to decide whether to use ZoneId.systemDefault() in TableConfig.getLocalTimeZone()
+			.defaultValue("System")
+			.withDescription("The local time zone defines current session time zone id. It is used when converting to/from " +
+				"TIMESTAMP_WITH_LOCAL_TIME_ZONE. Internally, timestamps with local time zone are always represented in the UTC time zone. " +

Review comment:
       ```suggestion
   				"<code>TIMESTAMP WITH LOCAL TIME ZONE</code>. Internally, timestamps with local time zone are always represented in the UTC time zone. " +
   ```

##########
File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/TableConfigTest.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link TableConfig}.
+ */
+public class TableConfigTest {
+	@Test
+	public void testGetAndSetTableConfigOptions() throws Exception{
+		Class<?> configClass = TableConfig.class;
+		TableConfig config = new TableConfig();
+		for (TestSpec<?> spec: testSpecList){
+			configClass.getMethod("set" + spec.fieldName, spec.inputClass).invoke(config, spec.inputValue);
+			assertEquals(spec.expectedValue, configClass.getMethod("get" + spec.fieldName).invoke(config));

Review comment:
       I think we should avoid use the Java reflection here, otherwise, it's hard to maintain when TableConfig is refactored again in the future. 
   
   I think we don't have much tests here, we can have a test for each method/option. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org