You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/08 14:27:32 UTC
[3/4] flink git commit: [hotfix] [RocksDB backend] Minor cleanups to
constructors and comments in RocksDBStateBackend
[hotfix] [RocksDB backend] Minor cleanups to constructors and comments in RocksDBStateBackend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9ddcb4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9ddcb4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9ddcb4c
Branch: refs/heads/master
Commit: d9ddcb4c297082ef9544c1b5d94c75267662b4c6
Parents: d450bee
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 7 18:39:32 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 15:26:42 2016 +0100
----------------------------------------------------------------------
.../streaming/state/RocksDBStateBackend.java | 34 ++++++++------------
.../state/RocksDBAsyncSnapshotTest.java | 6 ++--
.../state/RocksDBStateBackendConfigTest.java | 3 +-
.../state/RocksDBStateBackendTest.java | 2 +-
.../flink/cep/operator/CEPOperatorTest.java | 10 ++----
.../EventTimeWindowCheckpointingITCase.java | 3 +-
.../test/state/ManualWindowSpeedITCase.java | 11 +++----
7 files changed, 26 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 37c6312..9ba0dc1 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -29,15 +29,15 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
@@ -134,29 +134,23 @@ public class RocksDBStateBackend extends AbstractStateBackend {
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
- // creating the FsStateBackend automatically sanity checks the URI
- FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri);
-
- this.checkpointStreamBackend = fsStateBackend;
- }
-
-
- public RocksDBStateBackend(String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
- this(new Path(checkpointDataUri).toUri(), nonPartitionedStateBackend);
+ this(new FsStateBackend(checkpointDataUri));
}
- public RocksDBStateBackend(URI checkpointDataUri, AbstractStateBackend checkpointStreamBackend) throws IOException {
+ /**
+ * Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its
+ * checkpoint data streams. Typically, one would supply a filesystem or database state backend
+ * here where the snapshots from RocksDB would be stored.
+ *
+ * <p>The snapshots of the RocksDB state will be stored using the given backend's
+ * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
+ *
+ * @param checkpointStreamBackend The backend to store the
+ */
+ public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
this.checkpointStreamBackend = requireNonNull(checkpointStreamBackend);
}
- private void writeObject(ObjectOutputStream oos) throws IOException {
- oos.defaultWriteObject();
- }
-
- private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
- ois.defaultReadObject();
- isInitialized = false;
- }
// ------------------------------------------------------------------------
// State backend methods
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 1c5a91c..f3da93e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -119,9 +119,8 @@ public class RocksDBAsyncSnapshotTest {
StreamConfig streamConfig = testHarness.getStreamConfig();
File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
- File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
- RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
+ RocksDBStateBackend backend = new RocksDBStateBackend(new MemoryStateBackend());
backend.setDbStoragePath(dbDir.getAbsolutePath());
streamConfig.setStateBackend(backend);
@@ -220,11 +219,10 @@ public class RocksDBAsyncSnapshotTest {
StreamConfig streamConfig = testHarness.getStreamConfig();
File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
- File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend();
- RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), memoryStateBackend);
+ RocksDBStateBackend backend = new RocksDBStateBackend(memoryStateBackend);
backend.setDbStoragePath(dbDir.getAbsolutePath());
streamConfig.setStateBackend(backend);
http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 07fc27c..bf9b315 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -343,8 +343,7 @@ public class RocksDBStateBackendConfigTest {
@Test
public void testCallsForwardedToNonPartitionedBackend() throws Exception {
AbstractStateBackend nonPartBackend = mock(AbstractStateBackend.class);
- String checkpointPath = tempFolder.newFolder().toURI().toString();
- RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath, nonPartBackend);
+ RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(nonPartBackend);
Environment env = getMockEnvironment();
rocksDbBackend.createStreamFactory(env.getJobID(), "foobar");
http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9222f0b..9d25434 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -45,7 +45,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
protected RocksDBStateBackend getStateBackend() throws IOException {
String dbPath = tempFolder.newFolder().getAbsolutePath();
String checkpointPath = tempFolder.newFolder().toURI().toString();
- RocksDBStateBackend backend = new RocksDBStateBackend(checkpointPath, new FsStateBackend(checkpointPath));
+ RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath));
backend.setDbStoragePath(dbPath);
return backend;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 0f49b13..6cffd9c 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -301,9 +301,7 @@ public class CEPOperatorTest extends TestLogger {
public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception {
String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
- String rocksDbBackups = tempFolder.newFolder().toURI().toString();
- RocksDBStateBackend rocksDBStateBackend =
- new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+ RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
rocksDBStateBackend.setDbStoragePath(rocksDbPath);
KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
@@ -350,8 +348,7 @@ public class CEPOperatorTest extends TestLogger {
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
- rocksDBStateBackend =
- new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+ rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
rocksDBStateBackend.setDbStoragePath(rocksDbPath);
harness.setStateBackend(rocksDBStateBackend);
@@ -381,8 +378,7 @@ public class CEPOperatorTest extends TestLogger {
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
- rocksDBStateBackend =
- new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
+ rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
rocksDBStateBackend.setDbStoragePath(rocksDbPath);
harness.setStateBackend(rocksDBStateBackend);
harness.setup();
http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 4dbf5cb..4f28d8c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -120,8 +120,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
}
case ROCKSDB_FULLY_ASYNC: {
String rocksDb = tempFolder.newFolder().getAbsolutePath();
- String rocksDbBackups = tempFolder.newFolder().toURI().toString();
- RocksDBStateBackend rdb = new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend(MAX_MEM_STATE_SIZE));
+ RocksDBStateBackend rdb = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
rdb.setDbStoragePath(rocksDb);
this.stateBackend = rdb;
break;
http://git-wip-us.apache.org/repos/asf/flink/blob/d9ddcb4c/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
index 428c47c..456861a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
@@ -127,9 +127,8 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase {
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setParallelism(1);
-
- String checkpoints = tempFolder.newFolder().toURI().toString();
- env.setStateBackend(new RocksDBStateBackend(checkpoints, new MemoryStateBackend()));
+
+ env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
env.addSource(new InfiniteTupleSource(10_000))
.keyBy(0)
@@ -163,8 +162,7 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase {
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setParallelism(1);
- String rocksDbBackups = tempFolder.newFolder().toURI().toString();
- env.setStateBackend(new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()));
+ env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
env.addSource(new InfiniteTupleSource(10_000))
.keyBy(0)
@@ -199,8 +197,7 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase {
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(1);
- String rocksDbBackups = tempFolder.newFolder().toURI().toString();
- env.setStateBackend(new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()));
+ env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
env.addSource(new InfiniteTupleSource(10_000))
.keyBy(0)