You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/09 08:23:42 UTC

[GitHub] [flink] carp84 commented on a change in pull request #13393: [FLINK-19238] [RocksDB] Sanity check for arena block size

carp84 commented on a change in pull request #13393:
URL: https://github.com/apache/flink/pull/13393#discussion_r502211558



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -131,22 +133,23 @@ public static void registerKvStateInformation(
 		RegisteredStateMetaInfoBase metaInfoBase,
 		RocksDB db,
 		Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
-		@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
+		@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, Long writeBufferManagerCapacity) {

Review comment:
       ```suggestion
   		@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
   		@Nullable Long writeBufferManagerCapacity) {
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
##########
@@ -151,7 +151,7 @@ public void testGetColumnFamilyOptionsWithSharedResources() throws Exception {
 		final long cacheSize = 1024L, writeBufferSize = 512L;
 		final LRUCache cache = new LRUCache(cacheSize, -1, false, 0.1);
 		final WriteBufferManager wbm = new WriteBufferManager(writeBufferSize, cache);
-		RocksDBSharedResources rocksDBSharedResources = new RocksDBSharedResources(cache, wbm);
+		RocksDBSharedResources rocksDBSharedResources = new RocksDBSharedResources(cache, wbm, 512L);

Review comment:
       ```suggestion
   		RocksDBSharedResources rocksDBSharedResources = new RocksDBSharedResources(cache, wbm, writeBufferSize);
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -131,22 +133,23 @@ public static void registerKvStateInformation(
 		RegisteredStateMetaInfoBase metaInfoBase,
 		RocksDB db,
 		Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
-		@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
+		@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, Long writeBufferManagerCapacity) {
 
 		ColumnFamilyDescriptor columnFamilyDescriptor = createColumnFamilyDescriptor(
-			metaInfoBase, columnFamilyOptionsFactory, ttlCompactFiltersManager);
+			metaInfoBase, columnFamilyOptionsFactory, ttlCompactFiltersManager, writeBufferManagerCapacity);
 		return new RocksDBKeyedStateBackend.RocksDbKvStateInfo(createColumnFamily(columnFamilyDescriptor, db), metaInfoBase);
 	}
 
 	/**
-	 * Creates a column descriptor for sate column family.
+	 * Creates a column descriptor for a state column family.
 	 *
 	 * <p>Sets TTL compaction filter if {@code ttlCompactFiltersManager} is not {@code null}.
 	 */
 	public static ColumnFamilyDescriptor createColumnFamilyDescriptor(
 		RegisteredStateMetaInfoBase metaInfoBase,
 		Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
-		@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
+		@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
+		Long writeBufferManagerCapacity) {

Review comment:
       ```suggestion
   		@Nullable Long writeBufferManagerCapacity) {
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java
##########
@@ -78,6 +82,60 @@ public void testPathExceptionOnWindows() throws Exception {
 		}
 	}
 
+	@Test
+	public void testSanityCheckArenaBlockSize() {
+		List<TestData> tests = Arrays.asList(
+			new TestData(67108864, 0, 8388608, false),
+			new TestData(67108864, 8388608, 8388608, false),
+			new TestData(67108864, 0, 11184810, true),
+			new TestData(67108864, 8388608, 11184810, true)
+		);
+
+		for (TestData test : tests) {
+			long writeBufferSize = test.getWriteBufferSize();
+			long arenaBlockSizeConfigured = test.getArenaBlockSizeConfigured();
+			long writeBufferManagerCapacity = test.getWriteBufferManagerCapacity();
+			boolean expected = test.isExpected();
+
+			boolean isOk = RocksDBOperationUtils.sanityCheckArenaBlockSize(writeBufferSize, arenaBlockSizeConfigured, writeBufferManagerCapacity);
+			if (expected) {
+				assertTrue(isOk);
+			} else {
+				assertFalse(isOk);
+			}
+		}
+	}
+
+	private static class TestData {
+		private final long writeBufferSize;
+		private final long arenaBlockSizeConfigured;
+		private final long writeBufferManagerCapacity;
+		private final boolean expected;
+
+		public TestData(long writeBufferSize, long arenaBlockSizeConfigured, long writeBufferManagerCapacity, boolean expected) {
+			this.writeBufferSize = writeBufferSize;
+			this.arenaBlockSizeConfigured = arenaBlockSizeConfigured;
+			this.writeBufferManagerCapacity = writeBufferManagerCapacity;
+			this.expected = expected;
+		}
+
+		public long getWriteBufferSize() {
+			return writeBufferSize;
+		}
+
+		public long getArenaBlockSizeConfigured() {
+			return arenaBlockSizeConfigured;
+		}
+
+		public long getWriteBufferManagerCapacity() {
+			return writeBufferManagerCapacity;
+		}
+
+		public boolean isExpected() {

Review comment:
       ```suggestion
   		public boolean expectedResult() {
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java
##########
@@ -78,6 +82,60 @@ public void testPathExceptionOnWindows() throws Exception {
 		}
 	}
 
+	@Test
+	public void testSanityCheckArenaBlockSize() {
+		List<TestData> tests = Arrays.asList(
+			new TestData(67108864, 0, 8388608, false),
+			new TestData(67108864, 8388608, 8388608, false),
+			new TestData(67108864, 0, 11184810, true),
+			new TestData(67108864, 8388608, 11184810, true)
+		);

Review comment:
       ```suggestion
   		long testWriteBufferSize = 56 * 1024 * 1024L;
   		long testDefaultArenaSize = testWriteBufferSize / 8;
   		long testValidArenaSize = testWriteBufferSize / 7;
   		long testInvalidArenaSize = testWriteBufferSize / 7 - 8L;
   		List<TestData> tests = Arrays.asList(
   			new TestData(testWriteBufferSize, 0, testInvalidArenaSize, false),
   			new TestData(testWriteBufferSize, testDefaultArenaSize, testInvalidArenaSize, false),
   			new TestData(testWriteBufferSize, 0, testValidArenaSize, true),
   			new TestData(testWriteBufferSize, testDefaultArenaSize, testValidArenaSize, true)
   		);
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
##########
@@ -95,4 +95,45 @@ static Cache createCache(long cacheCapacity, double highPriorityPoolRatio) {
 	static WriteBufferManager createWriteBufferManager(long writeBufferManagerCapacity, Cache cache) {
 		return new WriteBufferManager(writeBufferManagerCapacity, cache);
 	}
+
+	/**
+	 * Calculate the default arena block size as RocksDB calculates it in
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196"/>.
+	 *
+	 * @return the default arena block size
+	 * @param writeBufferSize the write buffer size (bytes)
+	 */
+	static long calculateRocksDBDefaultArenaBlockSize(long writeBufferSize) {
+		return writeBufferSize / 8;
+	}
+
+	/**
+	 * Calculate {@code mutable_limit_} as RocksDB calculates it in
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/FRocksDB-5.17.2/memtable/write_buffer_manager.cc#L54"/>.
+	 *
+	 * @param bufferSize write buffer size
+	 * @return mutableLimit
+	 */
+	static long calculateRocksDBMutableLimit(long bufferSize) {
+		return bufferSize * 7 / 8;
+	}
+
+	/**
+	 * RocksDB starts flushing the active memtable constantly in the case when the arena block size is greater than
+	 * mutable limit (as calculated in {@link #calculateRocksDBMutableLimit(long)}).
+	 *
+	 * <p>This happens because in such a case the check here
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L47"/>
+	 * is always true.
+	 *
+	 * <p>This method checks that arena block size is smaller than mutable limit.
+	 *
+	 * @param arenaBlockSize Arena block size
+	 * @param mutableLimit mutable limit
+	 * @return whether arena block size is sensible
+	 */
+	@VisibleForTesting
+	static boolean validateArenaBlockSize(long arenaBlockSize, long mutableLimit) {
+		return arenaBlockSize < mutableLimit;

Review comment:
       ```suggestion
   		return arenaBlockSize <= mutableLimit;
   ```
   It should be ok if the arena block size equals to mutable limit according to the RocksDB code [here](https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47)?

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -156,9 +159,49 @@ public static ColumnFamilyDescriptor createColumnFamilyDescriptor(
 		Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
 			"The chosen state name 'default' collides with the name of the default column family!");
 
+		if (writeBufferManagerCapacity != null) {
+			// It'd be great to perform the check earlier, e.g. when creating write buffer manager.
+			// Unfortunately the check needs write buffer size that was just calculated.
+			sanityCheckArenaBlockSize(options.writeBufferSize(), options.arenaBlockSize(), writeBufferManagerCapacity);
+		}
+
 		return new ColumnFamilyDescriptor(nameBytes, options);
 	}
 
+	/**
+	 * Logs a warning of the arena block size is too high causing RocksDB to flush constantly.
+	 * Essentially, the condition here
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47"/>
+	 * will always be true.
+	 *
+	 * @param writeBufferSize the size of write buffer (bytes)
+	 * @param arenaBlockSizeConfigured the manually configured arena block size
+	 * @param writeBufferManagerCapacity the size of the write buffer manager (bytes)
+	 * @return true is sanity check passes, false otherwise

Review comment:
       ```suggestion
   	 * @return true if the sanity check passes, false otherwise
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -156,9 +159,49 @@ public static ColumnFamilyDescriptor createColumnFamilyDescriptor(
 		Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
 			"The chosen state name 'default' collides with the name of the default column family!");
 
+		if (writeBufferManagerCapacity != null) {
+			// It'd be great to perform the check earlier, e.g. when creating write buffer manager.
+			// Unfortunately the check needs write buffer size that was just calculated.
+			sanityCheckArenaBlockSize(options.writeBufferSize(), options.arenaBlockSize(), writeBufferManagerCapacity);
+		}
+
 		return new ColumnFamilyDescriptor(nameBytes, options);
 	}
 
+	/**
+	 * Logs a warning of the arena block size is too high causing RocksDB to flush constantly.

Review comment:
       ```suggestion
   	 * Logs a warning if the arena block size is too high causing RocksDB to flush constantly.
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
##########
@@ -95,4 +95,45 @@ static Cache createCache(long cacheCapacity, double highPriorityPoolRatio) {
 	static WriteBufferManager createWriteBufferManager(long writeBufferManagerCapacity, Cache cache) {
 		return new WriteBufferManager(writeBufferManagerCapacity, cache);
 	}
+
+	/**
+	 * Calculate the default arena block size as RocksDB calculates it in
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196"/>.
+	 *
+	 * @return the default arena block size
+	 * @param writeBufferSize the write buffer size (bytes)
+	 */
+	static long calculateRocksDBDefaultArenaBlockSize(long writeBufferSize) {
+		return writeBufferSize / 8;
+	}
+
+	/**
+	 * Calculate {@code mutable_limit_} as RocksDB calculates it in
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/FRocksDB-5.17.2/memtable/write_buffer_manager.cc#L54"/>.

Review comment:
       ```suggestion
   	 * Calculate {@code mutable_limit_} as RocksDB calculates it
   	 * <a href="https://github.com/dataArtisans/frocksdb/blob/FRocksDB-5.17.2/memtable/write_buffer_manager.cc#L54"/>
   	 * here</a>.
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
##########
@@ -95,4 +95,45 @@ static Cache createCache(long cacheCapacity, double highPriorityPoolRatio) {
 	static WriteBufferManager createWriteBufferManager(long writeBufferManagerCapacity, Cache cache) {
 		return new WriteBufferManager(writeBufferManagerCapacity, cache);
 	}
+
+	/**
+	 * Calculate the default arena block size as RocksDB calculates it in
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196"/>.
+	 *
+	 * @return the default arena block size
+	 * @param writeBufferSize the write buffer size (bytes)
+	 */
+	static long calculateRocksDBDefaultArenaBlockSize(long writeBufferSize) {
+		return writeBufferSize / 8;
+	}
+
+	/**
+	 * Calculate {@code mutable_limit_} as RocksDB calculates it in
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/FRocksDB-5.17.2/memtable/write_buffer_manager.cc#L54"/>.
+	 *
+	 * @param bufferSize write buffer size
+	 * @return mutableLimit
+	 */
+	static long calculateRocksDBMutableLimit(long bufferSize) {
+		return bufferSize * 7 / 8;
+	}
+
+	/**
+	 * RocksDB starts flushing the active memtable constantly in the case when the arena block size is greater than
+	 * mutable limit (as calculated in {@link #calculateRocksDBMutableLimit(long)}).
+	 *
+	 * <p>This happens because in such a case the check here
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L47"/>
+	 * is always true.

Review comment:
       ```suggestion
   	 * <p>This happens because in such a case the check
   	 * <a href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L47">
   	 * here</a> is always true.
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
##########
@@ -95,4 +95,45 @@ static Cache createCache(long cacheCapacity, double highPriorityPoolRatio) {
 	static WriteBufferManager createWriteBufferManager(long writeBufferManagerCapacity, Cache cache) {
 		return new WriteBufferManager(writeBufferManagerCapacity, cache);
 	}
+
+	/**
+	 * Calculate the default arena block size as RocksDB calculates it in
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196"/>.
+	 *
+	 * @return the default arena block size
+	 * @param writeBufferSize the write buffer size (bytes)
+	 */
+	static long calculateRocksDBDefaultArenaBlockSize(long writeBufferSize) {
+		return writeBufferSize / 8;
+	}
+
+	/**
+	 * Calculate {@code mutable_limit_} as RocksDB calculates it in
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/FRocksDB-5.17.2/memtable/write_buffer_manager.cc#L54"/>.
+	 *
+	 * @param bufferSize write buffer size
+	 * @return mutableLimit
+	 */
+	static long calculateRocksDBMutableLimit(long bufferSize) {
+		return bufferSize * 7 / 8;
+	}
+
+	/**
+	 * RocksDB starts flushing the active memtable constantly in the case when the arena block size is greater than
+	 * mutable limit (as calculated in {@link #calculateRocksDBMutableLimit(long)}).
+	 *
+	 * <p>This happens because in such a case the check here
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L47"/>
+	 * is always true.

Review comment:
       ```suggestion
   	 * <p>This happens because in such a case the check
   	 * <a href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L47">
   	 * here</a> is always true.
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
##########
@@ -95,4 +95,45 @@ static Cache createCache(long cacheCapacity, double highPriorityPoolRatio) {
 	static WriteBufferManager createWriteBufferManager(long writeBufferManagerCapacity, Cache cache) {
 		return new WriteBufferManager(writeBufferManagerCapacity, cache);
 	}
+
+	/**
+	 * Calculate the default arena block size as RocksDB calculates it in
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196"/>.

Review comment:
       ```suggestion
   	 * Calculate the default arena block size as RocksDB calculates it
   	 * <a href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196">
   	 * here</a>.
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java
##########
@@ -78,6 +82,60 @@ public void testPathExceptionOnWindows() throws Exception {
 		}
 	}
 
+	@Test
+	public void testSanityCheckArenaBlockSize() {
+		List<TestData> tests = Arrays.asList(
+			new TestData(67108864, 0, 8388608, false),
+			new TestData(67108864, 8388608, 8388608, false),
+			new TestData(67108864, 0, 11184810, true),
+			new TestData(67108864, 8388608, 11184810, true)
+		);
+
+		for (TestData test : tests) {
+			long writeBufferSize = test.getWriteBufferSize();
+			long arenaBlockSizeConfigured = test.getArenaBlockSizeConfigured();
+			long writeBufferManagerCapacity = test.getWriteBufferManagerCapacity();
+			boolean expected = test.isExpected();

Review comment:
       ```suggestion
   			boolean expected = test.expectedResult();
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java
##########
@@ -78,6 +82,60 @@ public void testPathExceptionOnWindows() throws Exception {
 		}
 	}
 
+	@Test
+	public void testSanityCheckArenaBlockSize() {
+		List<TestData> tests = Arrays.asList(
+			new TestData(67108864, 0, 8388608, false),
+			new TestData(67108864, 8388608, 8388608, false),
+			new TestData(67108864, 0, 11184810, true),
+			new TestData(67108864, 8388608, 11184810, true)
+		);
+
+		for (TestData test : tests) {
+			long writeBufferSize = test.getWriteBufferSize();
+			long arenaBlockSizeConfigured = test.getArenaBlockSizeConfigured();
+			long writeBufferManagerCapacity = test.getWriteBufferManagerCapacity();
+			boolean expected = test.isExpected();
+
+			boolean isOk = RocksDBOperationUtils.sanityCheckArenaBlockSize(writeBufferSize, arenaBlockSizeConfigured, writeBufferManagerCapacity);
+			if (expected) {
+				assertTrue(isOk);
+			} else {
+				assertFalse(isOk);
+			}

Review comment:
       ```suggestion
   			boolean sanityCheckResult = RocksDBOperationUtils.sanityCheckArenaBlockSize(writeBufferSize, arenaBlockSizeConfigured, writeBufferManagerCapacity);
   			assertThat(sanityCheckResult, is(expected));
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java
##########
@@ -32,10 +32,14 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;

Review comment:
       ```suggestion
   import static org.junit.Assert.assertThat;
   ```

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -156,9 +159,49 @@ public static ColumnFamilyDescriptor createColumnFamilyDescriptor(
 		Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
 			"The chosen state name 'default' collides with the name of the default column family!");
 
+		if (writeBufferManagerCapacity != null) {
+			// It'd be great to perform the check earlier, e.g. when creating write buffer manager.
+			// Unfortunately the check needs write buffer size that was just calculated.
+			sanityCheckArenaBlockSize(options.writeBufferSize(), options.arenaBlockSize(), writeBufferManagerCapacity);
+		}
+
 		return new ColumnFamilyDescriptor(nameBytes, options);
 	}
 
+	/**
+	 * Logs a warning of the arena block size is too high causing RocksDB to flush constantly.
+	 * Essentially, the condition here
+	 * <a href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47"/>
+	 * will always be true.
+	 *
+	 * @param writeBufferSize the size of write buffer (bytes)
+	 * @param arenaBlockSizeConfigured the manually configured arena block size
+	 * @param writeBufferManagerCapacity the size of the write buffer manager (bytes)
+	 * @return true is sanity check passes, false otherwise
+	 */
+	static boolean sanityCheckArenaBlockSize(
+		long writeBufferSize,
+		long arenaBlockSizeConfigured,
+		long writeBufferManagerCapacity) throws IllegalStateException {
+
+		long defaultArenaBlockSize = RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize);
+		long arenaBlockSize = arenaBlockSizeConfigured <= 0 ? defaultArenaBlockSize : arenaBlockSizeConfigured;
+		long mutableLimit = RocksDBMemoryControllerUtils.calculateRocksDBMutableLimit(writeBufferManagerCapacity);
+		if (RocksDBMemoryControllerUtils.validateArenaBlockSize(arenaBlockSize, mutableLimit)) {
+			return true;
+		} else {
+			LOG.warn("RocksDBStateBackend performance will be poor because of the current Flink memory configuration! " +
+					"RocksDB will flush memtable constantly, causing high IO and CPU. " +
+					"Typically the easiest fix is to increase task manager managed memory size. " +
+					"If running locally, see the parameter taskmanager.memory.managed.size. " +
+					"Details: arenaBlockSize {} < mutableLimit {} (writeBufferSize = {}, arenaBlockSizeConfigured = {}," +

Review comment:
       ```suggestion
   					"Details: arenaBlockSize {} > mutableLimit {} (writeBufferSize = {}, arenaBlockSizeConfigured = {}," +
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org