You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/23 08:56:34 UTC

[flink] branch release-1.9 updated: [FLINK-13337][table-planner-blink] Do not need to backup and restore streamEnv config in BatchExecutor

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 8c151e9  [FLINK-13337][table-planner-blink] Do not need to backup and restore streamEnv config in BatchExecutor
8c151e9 is described below

commit 8c151e99e6bb4d7d1d0eec74872b9aa2f038e8ea
Author: Xupingyong <xu...@163.com>
AuthorDate: Fri Jul 19 16:39:53 2019 +0800

    [FLINK-13337][table-planner-blink] Do not need to backup and restore streamEnv config in BatchExecutor
    
    This closes #9179
---
 .../apache/flink/table/executor/BatchExecutor.java | 55 ++-------------
 .../flink/table/executor/BatchExecutorTest.scala   | 81 ----------------------
 2 files changed, 6 insertions(+), 130 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index adafab1..0cf5169 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -43,8 +43,6 @@ import java.util.List;
 @Internal
 public class BatchExecutor extends ExecutorBase {
 
-	private BatchExecEnvConfig batchExecEnvConfig = new BatchExecEnvConfig();
-
 	@VisibleForTesting
 	public BatchExecutor(StreamExecutionEnvironment executionEnvironment) {
 		super(executionEnvironment);
@@ -58,10 +56,9 @@ public class BatchExecutor extends ExecutorBase {
 	}
 
 	/**
-	 * Backup previous streamEnv config and set batch configs.
+	 * Sets batch configs.
 	 */
-	private void backupAndUpdateStreamEnv(StreamExecutionEnvironment execEnv) {
-		batchExecEnvConfig.backup(execEnv);
+	private void setBatchProperties(StreamExecutionEnvironment execEnv) {
 		ExecutionConfig executionConfig = execEnv.getConfig();
 		executionConfig.enableObjectReuse();
 		executionConfig.setLatencyTrackingInterval(-1);
@@ -77,7 +74,7 @@ public class BatchExecutor extends ExecutorBase {
 	 */
 	public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
 		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-		backupAndUpdateStreamEnv(execEnv);
+		setBatchProperties(execEnv);
 		transformations.forEach(execEnv::addOperator);
 		StreamGraph streamGraph;
 		streamGraph = execEnv.getStreamGraph(getNonEmptyJobName(jobName));
@@ -91,11 +88,12 @@ public class BatchExecutor extends ExecutorBase {
 		streamGraph.setChaining(true);
 		streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
 		streamGraph.setStateBackend(null);
-		streamGraph.getCheckpointConfig().setCheckpointInterval(Long.MAX_VALUE);
+		if (streamGraph.getCheckpointConfig().isCheckpointingEnabled()) {
+			throw new IllegalArgumentException("Checkpoint is not supported for batch jobs.");
+		}
 		if (isShuffleModeAllBatch()) {
 			streamGraph.setBlockingConnectionsBetweenChains(true);
 		}
-		batchExecEnvConfig.restore(execEnv);
 		return streamGraph;
 	}
 
@@ -109,45 +107,4 @@ public class BatchExecutor extends ExecutorBase {
 		}
 		return false;
 	}
-
-	/**
-	 * Batch configs that are set in {@link StreamExecutionEnvironment}. We should backup and change
-	 * these configs and restore finally.
-	 */
-	private static class BatchExecEnvConfig {
-
-		private boolean enableObjectReuse;
-		private long latencyTrackingInterval;
-		private long bufferTimeout;
-		private TimeCharacteristic timeCharacteristic;
-		private InputDependencyConstraint inputDependencyConstraint;
-
-		/**
-		 * Backup previous streamEnv config.
-		 */
-		public void backup(StreamExecutionEnvironment execEnv) {
-			ExecutionConfig executionConfig = execEnv.getConfig();
-			enableObjectReuse = executionConfig.isObjectReuseEnabled();
-			latencyTrackingInterval = executionConfig.getLatencyTrackingInterval();
-			timeCharacteristic = execEnv.getStreamTimeCharacteristic();
-			bufferTimeout = execEnv.getBufferTimeout();
-			inputDependencyConstraint = executionConfig.getDefaultInputDependencyConstraint();
-		}
-
-		/**
-		 * Restore previous streamEnv after execute batch jobs.
-		 */
-		public void restore(StreamExecutionEnvironment execEnv) {
-			ExecutionConfig executionConfig = execEnv.getConfig();
-			if (enableObjectReuse) {
-				executionConfig.enableObjectReuse();
-			} else {
-				executionConfig.disableObjectReuse();
-			}
-			executionConfig.setLatencyTrackingInterval(latencyTrackingInterval);
-			execEnv.setStreamTimeCharacteristic(timeCharacteristic);
-			execEnv.setBufferTimeout(bufferTimeout);
-			executionConfig.setDefaultInputDependencyConstraint(inputDependencyConstraint);
-		}
-	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
deleted file mode 100644
index cca8e1a..0000000
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/executor/BatchExecutorTest.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.executor
-
-import org.apache.flink.api.common.InputDependencyConstraint
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.transformations.ShuffleMode
-import org.apache.flink.table.api.ExecutionConfigOptions
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.util.{BatchTableTestUtil, TableTestBase}
-
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.{Before, Test}
-
-/**
-  * Test for streamEnv config save and restore when run batch jobs.
-  */
-class BatchExecutorTest extends TableTestBase {
-
-  private var util: BatchTableTestUtil = _
-
-  @Before
-  def setUp(): Unit = {
-    util = batchTestUtil()
-    util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c)
-    util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f)
-  }
-
-  @Test
-  def testRestoreConfig(): Unit = {
-    util.getStreamEnv.setBufferTimeout(11)
-    util.getStreamEnv.getConfig.disableObjectReuse()
-    util.getStreamEnv.getConfig.setLatencyTrackingInterval(100)
-    util.getStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    util.getStreamEnv.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ANY)
-    util.verifyExplain("SELECT * FROM MyTable")
-    assertEquals(11, util.getStreamEnv.getBufferTimeout)
-    assertTrue(!util.getStreamEnv.getConfig.isObjectReuseEnabled)
-    assertEquals(100, util.getStreamEnv.getConfig.getLatencyTrackingInterval)
-    assertEquals(TimeCharacteristic.EventTime, util.getStreamEnv.getStreamTimeCharacteristic)
-    assertEquals(InputDependencyConstraint.ANY,
-      util.getStreamEnv.getConfig.getDefaultInputDependencyConstraint)
-  }
-
-  @Test
-  def testRestoreConfigWhenBatchShuffleMode(): Unit = {
-    util.getTableEnv.getConfig.getConfiguration.setString(
-      ExecutionConfigOptions.SQL_EXEC_SHUFFLE_MODE,
-      ShuffleMode.BATCH.toString)
-    util.getStreamEnv.setBufferTimeout(11)
-    util.getStreamEnv.getConfig.disableObjectReuse()
-    util.getStreamEnv.getConfig.setLatencyTrackingInterval(100)
-    util.getStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    util.getStreamEnv.getConfig.setDefaultInputDependencyConstraint(InputDependencyConstraint.ANY)
-    util.verifyExplain("SELECT * FROM MyTable")
-    assertEquals(11, util.getStreamEnv.getBufferTimeout)
-    assertTrue(!util.getStreamEnv.getConfig.isObjectReuseEnabled)
-    assertEquals(100, util.getStreamEnv.getConfig.getLatencyTrackingInterval)
-    assertEquals(TimeCharacteristic.EventTime, util.getStreamEnv.getStreamTimeCharacteristic)
-    assertEquals(InputDependencyConstraint.ANY,
-      util.getStreamEnv.getConfig.getDefaultInputDependencyConstraint)
-  }
-}