You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/23 08:56:34 UTC
[flink] branch release-1.9 updated:
[FLINK-13337][table-planner-blink] Do not need to backup and restore
streamEnv config in BatchExecutor
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 8c151e9 [FLINK-13337][table-planner-blink] Do not need to backup and restore streamEnv config in BatchExecutor
8c151e9 is described below
commit 8c151e99e6bb4d7d1d0eec74872b9aa2f038e8ea
Author: Xupingyong <xu...@163.com>
AuthorDate: Fri Jul 19 16:39:53 2019 +0800
[FLINK-13337][table-planner-blink] Do not need to backup and restore streamEnv config in BatchExecutor
This closes #9179
---
.../apache/flink/table/executor/BatchExecutor.java | 55 ++-------------
.../flink/table/executor/BatchExecutorTest.scala | 81 ----------------------
2 files changed, 6 insertions(+), 130 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index adafab1..0cf5169 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -43,8 +43,6 @@ import java.util.List;
@Internal
public class BatchExecutor extends ExecutorBase {
- private BatchExecEnvConfig batchExecEnvConfig = new BatchExecEnvConfig();
-
@VisibleForTesting
public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
super(executionEnvironment);
@@ -58,10 +56,9 @@ public class BatchExecutor extends ExecutorBase {
}
/**
- * Backup previous streamEnv config and set batch configs.
+ * Sets batch configs.
*/
- private void backupAndUpdateStreamEnv(StreamExecutionEnvironment execEnv) {
- batchExecEnvConfig.backup(execEnv);
+ private void setBatchProperties(StreamExecutionEnvironment execEnv) {
ExecutionConfig executionConfig = execEnv.getConfig();
executionConfig.enableObjectReuse();
executionConfig.setLatencyTrackingInterval(-1);
@@ -77,7 +74,7 @@ public class BatchExecutor extends ExecutorBase {
*/
public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
StreamExecutionEnvironment execEnv = getExecutionEnvironment();
- backupAndUpdateStreamEnv(execEnv);
+ setBatchProperties(execEnv);
transformations.forEach(execEnv::addOperator);
StreamGraph streamGraph;
streamGraph = execEnv.getStreamGraph(getNonEmptyJobName(jobName));
@@ -91,11 +88,12 @@ public class BatchExecutor extends ExecutorBase {
streamGraph.setChaining(true);
streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
streamGraph.setStateBackend(null);
- streamGraph.getCheckpointConfig().setCheckpointInterval(Long.MAX_VALUE);
+ if (streamGraph.getCheckpointConfig().isCheckpointingEnabled()) {
+ throw new IllegalArgumentException("Checkpoint is not supported for batch jobs.");
+ }
if (isShuffleModeAllBatch()) {
streamGraph.setBlockingConnectionsBetweenChains(true);
}
- batchExecEnvConfig.restore(execEnv);
return streamGraph;
}
@@ -109,45 +107,4 @@ public class BatchExecutor extends ExecutorBase {
}
return false;
}
-
- /**
- * Batch configs that are set in {@link StreamExecutionEnvironment}. We should backup and change
- * these configs and restore finally.
- */
- private static class BatchExecEnvConfig {
-
- private boolean enableObjectReuse;
- private long latencyTrackingInterval;
- private long bufferTimeout;
- private TimeCharacteristic timeCharacteristic;
- private InputDependencyConstraint inputDependencyConstraint;
-
- /**
- * Backup previous streamEnv config.
- */
- public void backup(StreamExecutionEnvironment execEnv) {
- ExecutionConfig executionConfig = execEnv.getConfig();
- enableObjectReuse = executionConfig.isObjectReuseEnabled();
- latencyTrackingInterval = executionConfig.getLatencyTrackingInterval();
- timeCharacteristic = execEnv.getStreamTimeCharacteristic();
- bufferTimeout = execEnv.getBufferTimeout();
- inputDependencyConstraint = executionConfig.getDefaultInputDependencyConstraint();
- }
-
- /**
- * Restore previous streamEnv after execute batch jobs.
- */
- public void restore(StreamExecutionEnvironment execEnv) {
- ExecutionConfig executionConfig = execEnv.getConfig();
- if (enableObjectReuse) {
- executionConfig.enableObjectReuse();
- } else {
- executionConfig.disableObjectReuse();
- }
- executionConfig.setLatencyTrackingInterval(latencyTrackingInterval);
- execEnv.setStreamTimeCharacteristic(timeCharacteristic);
- execEnv.setBufferTimeout(bufferTimeout);
- executionConfig.setDefaultInputDependencyConstraint(inputDependencyConstraint);
- }
- }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
deleted file mode 100644
index cca8e1a..0000000
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.executor
-
-import org.apache.flink.api.common.InputDependencyConstraint
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.transformations.ShuffleMode
-import org.apache.flink.table.api.ExecutionConfigOptions
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.util.{BatchTableTestUtil, TableTestBase}
-
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.{Before, Test}
-
-/**
- * Test for streamEnv config save and restore when run batch jobs.
- */
-class BatchExecutorTest extends TableTestBase {
-
- private var util: BatchTableTestUtil = _
-
- @Before
- def setUp(): Unit = {
- util = batchTestUtil()
- util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
- util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
- }
-
- @Test
- def testRestoreConfig(): Unit = {
- util.getStreamEnv.setBufferTimeout(11)
- util.getStreamEnv.getConfig.disableObjectReuse()
- util.getStreamEnv.getConfig.setLatencyTrackingInterval(100)
- util.getStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- util.getStreamEnv.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ANY)
- util.verifyExplain("SELECT * FROM MyTable")
- assertEquals(11, util.getStreamEnv.getBufferTimeout)
- assertTrue(!util.getStreamEnv.getConfig.isObjectReuseEnabled)
- assertEquals(100, util.getStreamEnv.getConfig.getLatencyTrackingInterval)
- assertEquals(TimeCharacteristic.EventTime, util.getStreamEnv.getStreamTimeCharacteristic)
- assertEquals(InputDependencyConstraint.ANY,
- util.getStreamEnv.getConfig.getDefaultInputDependencyConstraint)
- }
-
- @Test
- def testRestoreConfigWhenBatchShuffleMode(): Unit = {
- util.getTableEnv.getConfig.getConfiguration.setString(
- ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE,
- ShuffleMode.BATCH.toString)
- util.getStreamEnv.setBufferTimeout(11)
- util.getStreamEnv.getConfig.disableObjectReuse()
- util.getStreamEnv.getConfig.setLatencyTrackingInterval(100)
- util.getStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- util.getStreamEnv.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ANY)
- util.verifyExplain("SELECT * FROM MyTable")
- assertEquals(11, util.getStreamEnv.getBufferTimeout)
- assertTrue(!util.getStreamEnv.getConfig.isObjectReuseEnabled)
- assertEquals(100, util.getStreamEnv.getConfig.getLatencyTrackingInterval)
- assertEquals(TimeCharacteristic.EventTime, util.getStreamEnv.getStreamTimeCharacteristic)
- assertEquals(InputDependencyConstraint.ANY,
- util.getStreamEnv.getConfig.getDefaultInputDependencyConstraint)
- }
-}