You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/08 14:27:32 UTC

[3/4] flink git commit: [hotfix] [RocksDB backend] Minor cleanups to constructors and comments in RocksDBStateBackend

[hotfix] [RocksDB backend] Minor cleanups to constructors and comments in RocksDBStateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9ddcb4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9ddcb4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9ddcb4c

Branch: refs/heads/master
Commit: d9ddcb4c297082ef9544c1b5d94c75267662b4c6
Parents: d450bee
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 7 18:39:32 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 15:26:42 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    | 34 ++++++++------------
 .../state/RocksDBAsyncSnapshotTest.java         |  6 ++--
 .../state/RocksDBStateBackendConfigTest.java    |  3 +-
 .../state/RocksDBStateBackendTest.java          |  2 +-
 .../flink/cep/operator/CEPOperatorTest.java     | 10 ++----
 .../EventTimeWindowCheckpointingITCase.java     |  3 +-
 .../test/state/ManualWindowSpeedITCase.java     | 11 +++----
 7 files changed, 26 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 37c6312..9ba0dc1 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -29,15 +29,15 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -134,29 +134,23 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
 	 */
 	public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
-		// creating the FsStateBackend automatically sanity checks the URI
-		FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri);
-
-		this.checkpointStreamBackend = fsStateBackend;
-	}
-
-
-	public RocksDBStateBackend(String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
-		this(new Path(checkpointDataUri).toUri(), nonPartitionedStateBackend);
+		this(new FsStateBackend(checkpointDataUri));
 	}
 
-	public RocksDBStateBackend(URI checkpointDataUri, AbstractStateBackend checkpointStreamBackend) throws IOException {
+	/**
+	 * Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its
+	 * checkpoint data streams. Typically, one would supply a filesystem or database state backend
+	 * here where the snapshots from RocksDB would be stored.
+	 * 
+	 * <p>The snapshots of the RocksDB state will be stored using the given backend's
+	 * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}. 
+	 * 
+	 * @param checkpointStreamBackend The backend to store the
+	 */
+	public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
 		this.checkpointStreamBackend = requireNonNull(checkpointStreamBackend);
 	}
 
-	private void writeObject(ObjectOutputStream oos) throws IOException {
-		oos.defaultWriteObject();
-	}
-
-	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-		ois.defaultReadObject();
-		isInitialized = false;
-	}
 	// ------------------------------------------------------------------------
 	//  State backend methods
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 1c5a91c..f3da93e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -119,9 +119,8 @@ public class RocksDBAsyncSnapshotTest {
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 
 		File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
-		File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
 
-		RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
+		RocksDBStateBackend backend = new RocksDBStateBackend(new MemoryStateBackend());
 		backend.setDbStoragePath(dbDir.getAbsolutePath());
 
 		streamConfig.setStateBackend(backend);
@@ -220,11 +219,10 @@ public class RocksDBAsyncSnapshotTest {
 		StreamConfig streamConfig = testHarness.getStreamConfig();
 
 		File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
-		File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
 
 		BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend();
 
-		RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), memoryStateBackend);
+		RocksDBStateBackend backend = new RocksDBStateBackend(memoryStateBackend);
 		backend.setDbStoragePath(dbDir.getAbsolutePath());
 
 		streamConfig.setStateBackend(backend);

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 07fc27c..bf9b315 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -343,8 +343,7 @@ public class RocksDBStateBackendConfigTest {
 	@Test
 	public void testCallsForwardedToNonPartitionedBackend() throws Exception {
 		AbstractStateBackend nonPartBackend = mock(AbstractStateBackend.class);
-		String checkpointPath = tempFolder.newFolder().toURI().toString();
-		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath, nonPartBackend);
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(nonPartBackend);
 
 		Environment env = getMockEnvironment();
 		rocksDbBackend.createStreamFactory(env.getJobID(), "foobar");

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9222f0b..9d25434 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -45,7 +45,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	protected RocksDBStateBackend getStateBackend() throws IOException {
 		String dbPath = tempFolder.newFolder().getAbsolutePath();
 		String checkpointPath = tempFolder.newFolder().toURI().toString();
-		RocksDBStateBackend backend = new RocksDBStateBackend(checkpointPath, new FsStateBackend(checkpointPath));
+		RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath));
 		backend.setDbStoragePath(dbPath);
 		return backend;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 0f49b13..6cffd9c 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -301,9 +301,7 @@ public class CEPOperatorTest extends TestLogger {
 	public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception {
 
 		String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
-		String rocksDbBackups = tempFolder.newFolder().toURI().toString();
-		RocksDBStateBackend rocksDBStateBackend =
-				new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
 		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 
 		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
@@ -350,8 +348,7 @@ public class CEPOperatorTest extends TestLogger {
 				keySelector,
 				BasicTypeInfo.INT_TYPE_INFO);
 
-		rocksDBStateBackend =
-				new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+		rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
 		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 		harness.setStateBackend(rocksDBStateBackend);
 
@@ -381,8 +378,7 @@ public class CEPOperatorTest extends TestLogger {
 				keySelector,
 				BasicTypeInfo.INT_TYPE_INFO);
 
-		rocksDBStateBackend =
-				new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+		rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
 		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 		harness.setStateBackend(rocksDBStateBackend);
 		harness.setup();

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 4dbf5cb..4f28d8c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -120,8 +120,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 			}
 			case ROCKSDB_FULLY_ASYNC: {
 				String rocksDb = tempFolder.newFolder().getAbsolutePath();
-				String rocksDbBackups = tempFolder.newFolder().toURI().toString();
-				RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend(MAX_MEM_STATE_SIZE));
+				RocksDBStateBackend rdb = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
 				rdb.setDbStoragePath(rocksDb);
 				this.stateBackend = rdb;
 				break;

http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
index 428c47c..456861a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
@@ -127,9 +127,8 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase {
 
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 		env.setParallelism(1);
-
-		String checkpoints = tempFolder.newFolder().toURI().toString();
-		env.setStateBackend(new RocksDBStateBackend(checkpoints, new MemoryStateBackend()));
+		
+		env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
 
 		env.addSource(new InfiniteTupleSource(10_000))
 				.keyBy(0)
@@ -163,8 +162,7 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase {
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 		env.setParallelism(1);
 
-		String rocksDbBackups = tempFolder.newFolder().toURI().toString();
-		env.setStateBackend(new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()));
+		env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
 
 		env.addSource(new InfiniteTupleSource(10_000))
 				.keyBy(0)
@@ -199,8 +197,7 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase {
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 		env.setParallelism(1);
 
-		String rocksDbBackups = tempFolder.newFolder().toURI().toString();
-		env.setStateBackend(new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()));
+		env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
 
 		env.addSource(new InfiniteTupleSource(10_000))
 				.keyBy(0)