You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/05/28 17:50:17 UTC

[flink] branch master updated: [FLINK-17865][checkpoint] Increase default size of 'state.backend.fs.memory-threshold'

This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a835f31  [FLINK-17865][checkpoint] Increase default size of 'state.backend.fs.memory-threshold'
a835f31 is described below

commit a835f31a3a78f34b4a80f9e634b34c6a6681a482
Author: Yun Tang <my...@live.com>
AuthorDate: Thu May 21 20:37:56 2020 +0800

    [FLINK-17865][checkpoint] Increase default size of 'state.backend.fs.memory-threshold'
    
    This closes #12282.
---
 .../generated/checkpointing_configuration.html     |  6 ++--
 .../generated/expert_state_backends_section.html   |  6 ++--
 .../flink/configuration/CheckpointingOptions.java  |  7 ++--
 .../state/api/output/SavepointOutputFormat.java    |  2 +-
 .../pyflink/datastream/tests/test_state_backend.py |  2 +-
 .../runtime/state/filesystem/FsStateBackend.java   | 39 ++++++++++++++--------
 .../runtime/state/StateBackendLoadingTest.java     | 17 +++++-----
 .../flink/test/checkpointing/SavepointITCase.java  |  4 +--
 .../utils/SavepointMigrationTestBase.java          |  3 +-
 9 files changed, 50 insertions(+), 36 deletions(-)

diff --git a/docs/_includes/generated/checkpointing_configuration.html b/docs/_includes/generated/checkpointing_configuration.html
index c8517a5..748bdb2 100644
--- a/docs/_includes/generated/checkpointing_configuration.html
+++ b/docs/_includes/generated/checkpointing_configuration.html
@@ -22,9 +22,9 @@
         </tr>
         <tr>
             <td><h5>state.backend.fs.memory-threshold</h5></td>
-            <td style="word-wrap: break-word;">1024</td>
-            <td>Integer</td>
-            <td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file.</td>
+            <td style="word-wrap: break-word;">20 kb</td>
+            <td>MemorySize</td>
+            <td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.</td>
         </tr>
         <tr>
             <td><h5>state.backend.fs.write-buffer-size</h5></td>
diff --git a/docs/_includes/generated/expert_state_backends_section.html b/docs/_includes/generated/expert_state_backends_section.html
index 9d50be1..0fed867 100644
--- a/docs/_includes/generated/expert_state_backends_section.html
+++ b/docs/_includes/generated/expert_state_backends_section.html
@@ -16,9 +16,9 @@
         </tr>
         <tr>
             <td><h5>state.backend.fs.memory-threshold</h5></td>
-            <td style="word-wrap: break-word;">1024</td>
-            <td>Integer</td>
-            <td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file.</td>
+            <td style="word-wrap: break-word;">20 kb</td>
+            <td>MemorySize</td>
+            <td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.</td>
         </tr>
         <tr>
             <td><h5>state.backend.fs.write-buffer-size</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index df19ab9..16eaf75 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -140,11 +140,12 @@ public class CheckpointingOptions {
 	/** The minimum size of state data files. All state chunks smaller than that
 	 * are stored inline in the root checkpoint metadata file. */
 	@Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS)
-	public static final ConfigOption<Integer> FS_SMALL_FILE_THRESHOLD = ConfigOptions
+	public static final ConfigOption<MemorySize> FS_SMALL_FILE_THRESHOLD = ConfigOptions
 			.key("state.backend.fs.memory-threshold")
-			.defaultValue(1024)
+			.memoryType()
+			.defaultValue(MemorySize.parse("20kb"))
 			.withDescription("The minimum size of state data files. All state chunks smaller than that are stored" +
-				" inline in the root checkpoint metadata file.");
+				" inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.");
 
 	/**
 	 * The default size of the write buffer for the checkpoint streams that write to file systems.
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
index 8235067..bebd435 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SavepointOutputFormat.java
@@ -93,7 +93,7 @@ public class SavepointOutputFormat extends RichOutputFormat<CheckpointMetadata>
 			location,
 			location,
 			reference,
-			CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue(),
+			(int) CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
 			CheckpointingOptions.FS_WRITE_BUFFER_SIZE.defaultValue());
 	}
 }
diff --git a/flink-python/pyflink/datastream/tests/test_state_backend.py b/flink-python/pyflink/datastream/tests/test_state_backend.py
index 4f3249f..3ce18dc 100644
--- a/flink-python/pyflink/datastream/tests/test_state_backend.py
+++ b/flink-python/pyflink/datastream/tests/test_state_backend.py
@@ -97,7 +97,7 @@ class FsStateBackendTests(PyFlinkTestCase):
 
         state_backend = FsStateBackend("file://var/checkpoints/")
 
-        self.assertEqual(state_backend.get_min_file_size_threshold(), 1024)
+        self.assertEqual(state_backend.get_min_file_size_threshold(), 20480)
 
         state_backend = FsStateBackend("file://var/checkpoints/", file_state_size_threshold=2048)
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index e45fb20..1c61ab6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.TernaryBoolean;
 
 import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 
+import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -363,22 +365,24 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
 		this.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined(
 				configuration.get(CheckpointingOptions.ASYNC_SNAPSHOTS));
 
-		final int sizeThreshold = original.fileStateThreshold >= 0 ?
-				original.fileStateThreshold :
-				configuration.get(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD);
+		if (getValidFileStateThreshold(original.fileStateThreshold) >= 0) {
+			this.fileStateThreshold = original.fileStateThreshold;
+		} else {
+			final int configuredStateThreshold =
+				getValidFileStateThreshold(configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes());
 
-		if (sizeThreshold >= 0 && sizeThreshold <= MAX_FILE_STATE_THRESHOLD) {
-			this.fileStateThreshold = sizeThreshold;
-		}
-		else {
-			this.fileStateThreshold = CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
+			if (configuredStateThreshold >= 0) {
+				this.fileStateThreshold = configuredStateThreshold;
+			} else {
+				this.fileStateThreshold = MathUtils.checkedDownCast(FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
 
-			// because this is the only place we (unlikely) ever log, we lazily
-			// create the logger here
-			LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
+				// because this is the only place we (unlikely) ever log, we lazily
+				// create the logger here
+				LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
 					"Ignoring invalid file size threshold value ({}): {} - using default value {} instead.",
-					CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.key(), sizeThreshold,
-					CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue());
+					FS_SMALL_FILE_THRESHOLD.key(), configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes(),
+					FS_SMALL_FILE_THRESHOLD.defaultValue());
+			}
 		}
 
 		final int bufferSize = original.writeBufferSize >= 0 ?
@@ -388,6 +392,13 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
 		this.writeBufferSize = Math.max(bufferSize, this.fileStateThreshold);
 	}
 
+	private int getValidFileStateThreshold(long fileStateThreshold) {
+		if (fileStateThreshold >= 0 && fileStateThreshold <= MAX_FILE_STATE_THRESHOLD) {
+			return (int) fileStateThreshold;
+		}
+		return -1;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Properties
 	// ------------------------------------------------------------------------
@@ -432,7 +443,7 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
 	public int getMinFileSizeThreshold() {
 		return fileStateThreshold >= 0 ?
 				fileStateThreshold :
-				CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
+				MathUtils.checkedDownCast(FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
index 877a51d..5340290 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -227,7 +228,7 @@ public class StateBackendLoadingTest {
 		final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
 		final Path expectedCheckpointsPath = new Path(checkpointDir);
 		final Path expectedSavepointsPath = new Path(savepointDir);
-		final int threshold = 1000000;
+		final MemorySize threshold = MemorySize.parse("900kb");
 		final int minWriteBufferSize = 1024;
 		final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();
 
@@ -237,7 +238,7 @@ public class StateBackendLoadingTest {
 		config1.setString(backendKey, "filesystem");
 		config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
 		config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
-		config1.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
+		config1.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
 		config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, minWriteBufferSize);
 		config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
 
@@ -245,7 +246,7 @@ public class StateBackendLoadingTest {
 		config2.setString(backendKey, FsStateBackendFactory.class.getName());
 		config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
 		config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
-		config2.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
+		config2.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
 		config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, minWriteBufferSize);
 		config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);
 
@@ -262,10 +263,10 @@ public class StateBackendLoadingTest {
 		assertEquals(expectedCheckpointsPath, fs2.getCheckpointPath());
 		assertEquals(expectedSavepointsPath, fs1.getSavepointPath());
 		assertEquals(expectedSavepointsPath, fs2.getSavepointPath());
-		assertEquals(threshold, fs1.getMinFileSizeThreshold());
-		assertEquals(threshold, fs2.getMinFileSizeThreshold());
-		assertEquals(Math.max(threshold, minWriteBufferSize), fs1.getWriteBufferSize());
-		assertEquals(Math.max(threshold, minWriteBufferSize), fs2.getWriteBufferSize());
+		assertEquals(threshold.getBytes(), fs1.getMinFileSizeThreshold());
+		assertEquals(threshold.getBytes(), fs2.getMinFileSizeThreshold());
+		assertEquals(Math.max(threshold.getBytes(), minWriteBufferSize), fs1.getWriteBufferSize());
+		assertEquals(Math.max(threshold.getBytes(), minWriteBufferSize), fs2.getWriteBufferSize());
 		assertEquals(async, fs1.isUsingAsynchronousSnapshots());
 		assertEquals(async, fs2.isUsingAsynchronousSnapshots());
 	}
@@ -293,7 +294,7 @@ public class StateBackendLoadingTest {
 		config.setString(backendKey, "jobmanager"); // this should not be picked up 
 		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
-		config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 20); // this should not be picked up
+		config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.parse("20")); // this should not be picked up
 		config.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 3000000); // this should not be picked up
 
 		final StateBackend loadedBackend =
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index be9b706..abdc188 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -576,7 +576,7 @@ public class SavepointITCase extends TestLogger {
 			if (data == null) {
 				// We need this to be large, because we want to test with files
 				Random rand = new Random(getRuntimeContext().getIndexOfThisSubtask());
-				data = new byte[CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue() + 1];
+				data = new byte[(int) CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes() + 1];
 				rand.nextBytes(data);
 			}
 		}
@@ -833,7 +833,7 @@ public class SavepointITCase extends TestLogger {
 		final Configuration config = new Configuration();
 		config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
 		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
-		config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
+		config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
 		return config;
 	}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 638edb5..1e7f619 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -109,7 +110,7 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 
 		config.setString(CheckpointingOptions.STATE_BACKEND, "memory");
 		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
-		config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
+		config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 		config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 300L);