You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/16 13:09:11 UTC

flink git commit: [FLINK-5352] [rocksdb] Restore 1.1.3 RocksDB memory footprint

Repository: flink
Updated Branches:
  refs/heads/release-1.1 9ed7752eb -> a8b415fdf


[FLINK-5352] [rocksdb] Restore 1.1.3 RocksDB memory footprint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8b415fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8b415fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8b415fd

Branch: refs/heads/release-1.1
Commit: a8b415fdfdda0529b9689b8ef801f3c868a42e81
Parents: 9ed7752
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Dec 16 12:32:02 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 16 14:07:43 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/PredefinedOptions.java      | 32 ++++++++++++++++++--
 .../streaming/state/RocksDBStateBackend.java    |  6 ++--
 .../state/RocksDBStateBackendConfigTest.java    | 10 +++---
 3 files changed, 38 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a8b415fd/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
index 93aac85..af85fa7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -35,14 +35,13 @@ import org.rocksdb.StringAppendOperator;
 public enum PredefinedOptions {
 
 	/**
-	 * Default options for all settings, except that writes are not forced to the
-	 * disk.
+	 * Default options for all settings, except that writes are not forced to the disk.
 	 * 
 	 * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
 	 * there is no need to sync data to stable storage.
 	 */
 	DEFAULT {
-		
+
 		@Override
 		public DBOptions createDBOptions() {
 			return new DBOptions()
@@ -55,7 +54,34 @@ public enum PredefinedOptions {
 			return new ColumnFamilyOptions()
 					.setMergeOperator(new StringAppendOperator());
 		}
+	},
+
+	/**
+	 * Default options as defined by RocksDB version 4.5.1. This options are present to allow
+	 * reproducing the memory usage behavior from Flink versions 1.0.x and 1.1.x. 
+	 *
+	 * @deprecated These options are only used to make Flink 1.1.4 (updated RocksDB dependency due
+	 *             to critical bug fixes in RocksDB) behave similar to Flink 1.1.3 and prior versions.
+	 *             This option will not be present in Flink 1.2.0 and upwards.
+	 */
+	@Deprecated
+	DEFAULT_ROCKS_4_5_1 {
+
+		@Override
+		public DBOptions createDBOptions() {
+			return new DBOptions()
+					.setUseFsync(false)
+					.setDisableDataSync(true);
+		}
 
+		@Override
+		public ColumnFamilyOptions createColumnOptions() {
+			return new ColumnFamilyOptions()
+					.setMergeOperator(new StringAppendOperator())
+					.setWriteBufferSize(4194304)
+					.setTargetFileSizeBase(2097152)
+					.setMaxBytesForLevelBase(10485760);
+		}
 	},
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a8b415fd/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 3d75bde..7c8b7b6 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -132,8 +132,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	
 	// RocksDB options
 	
-	/** The pre-configured option settings */
-	private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
+	/** The pre-configured option settings - currently set to use the old RocksDB settings for
+	 * backwards compatible memory footprints */
+	@SuppressWarnings("deprecated")
+	private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT_ROCKS_4_5_1;
 	
 	/** The options factory to create the RocksDB options in the cluster */
 	private OptionsFactory optionsFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/a8b415fd/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index fca5773..5cc7182 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -197,15 +197,15 @@ public class RocksDBStateBackendConfigTest {
 	@Test
 	public void testPredefinedOptions() throws Exception {
 		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-		
-		assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions());
-		
+
+		assertEquals(PredefinedOptions.DEFAULT_ROCKS_4_5_1, rocksDbBackend.getPredefinedOptions());
+
 		rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
 		assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
 
 		DBOptions opt1 = rocksDbBackend.getDbOptions();
 		DBOptions opt2 = rocksDbBackend.getDbOptions();
-		
+
 		assertEquals(opt1, opt2);
 
 		ColumnFamilyOptions columnOpt1 = rocksDbBackend.getColumnOptions();
@@ -240,7 +240,7 @@ public class RocksDBStateBackendConfigTest {
 	public void testPredefinedAndOptionsFactory() throws Exception {
 		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
 
-		assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions());
+		assertEquals(PredefinedOptions.DEFAULT_ROCKS_4_5_1, rocksDbBackend.getPredefinedOptions());
 
 		rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
 		rocksDbBackend.setOptions(new OptionsFactory() {