You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sihuazhou <gi...@git.apache.org> on 2018/03/07 09:21:01 UTC
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
GitHub user sihuazhou opened a pull request:
https://github.com/apache/flink/pull/5650
[FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to improve performance for recovery in RocksDB backend
## What is the purpose of the change
This PR addresses [FLINK-8845](https://issues.apache.org/jira/browse/FLINK-8845), which attempts to use `WriteBatch` to improve the performance for loading data into RocksDB. It's inspired by [RocksDB FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ).
## Brief change log
- *Introduce `RocksDBWriteBatchWrapper` to load data into RocksDB in bulk*
## Verifying this change
- Introduce `RocksDBWriteBatchWrapperTest.java` to guard `RocksDBWriteBatchWrapper`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
- The S3 file system connector: (no)
## Documentation
none
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/sihuazhou/flink rocksdb_write_batch
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5650.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5650
----
commit e710287495d2a1a12a99b812c9691e12c6c57459
Author: sihuazhou <su...@...>
Date: 2018-03-07T05:58:45Z
Introduce RocksDBWriteBatchWrapper to speed up write performance.
----
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r173048763
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+ private final static int MIN_CAPACITY = 100;
+ private final static int MAX_CAPACITY = 10000;
+
+ private final RocksDB db;
+
+ private final WriteBatch batch;
+
+ private final WriteOptions options;
+
+ private final int capacity;
+
+ private int currentSize;
+
+ public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+ @Nonnull WriteOptions options,
+ int capacity) {
+
+ Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
+ "capacity should at least greater than 100");
+
+ this.db = rocksDB;
+ this.options = options;
+ this.capacity = capacity;
+ this.batch = new WriteBatch(this.capacity);
+ this.currentSize = 0;
+ }
+
+ public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException {
+
+ this.batch.put(handle, key, value);
+
+ if (++currentSize == capacity) {
+ flush();
+ }
+ }
+
+ public void flush() throws RocksDBException {
+ this.db.write(options, batch);
+ batch.clear();
+ currentSize = 0;
+ }
+
+ @Override
+ public void close() throws RocksDBException {
+ if (batch != null) {
--- End diff --
You are right, this `if` can be removed.
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r172935414
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+ private final static int MIN_CAPACITY = 100;
+ private final static int MAX_CAPACITY = 10000;
+
+ private final RocksDB db;
+
+ private final WriteBatch batch;
+
+ private final WriteOptions options;
+
+ private final int capacity;
+
+ private int currentSize;
+
+ public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+ @Nonnull WriteOptions options,
+ int capacity) {
+
+ Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
+ "capacity should at least greater than 100");
+
+ this.db = rocksDB;
+ this.options = options;
+ this.capacity = capacity;
+ this.batch = new WriteBatch(this.capacity);
+ this.currentSize = 0;
+ }
+
+ public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException {
+
+ this.batch.put(handle, key, value);
+
+ if (++currentSize == capacity) {
+ flush();
+ }
+ }
+
+ public void flush() throws RocksDBException {
+ this.db.write(options, batch);
+ batch.clear();
+ currentSize = 0;
+ }
+
+ @Override
+ public void close() throws RocksDBException {
+ if (batch != null) {
--- End diff --
can batch be null?
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r174231142
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void basicTest() throws Exception {
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
+ for (int i = 0; i < 10000; ++i) {
+ data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
+ }
+
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(true);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+ RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
+
+ // insert data
+ for (Tuple2<byte[], byte[]> item : data) {
+ writeBatchWrapper.put(handle, item.f0, item.f1);
+ }
+ writeBatchWrapper.flush();
+
+ // valid result
+ for (Tuple2<byte[], byte[]> item : data) {
+ Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
+ }
+ }
+ }
+
+ @Test
+ @Ignore
+ public void benchMark() throws Exception {
+
+ // put with disableWAL=true VS put with disableWAL=false
+ System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--");
+ benchMarkHelper(1_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(10_000, true, WRITETYPE.PUT);
+ benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(100_000, true, WRITETYPE.PUT);
+ benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+ // put with disableWAL=true VS write batch with disableWAL=false
+ System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--");
+ benchMarkHelper(1_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(10_000, true, WRITETYPE.PUT);
+ benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(100_000, true, WRITETYPE.PUT);
+ benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+ // write batch with disableWAL=true VS write batch disableWAL = true
+ System.out.println("--> write batch with disableWAL=true VS write batch disableWAL = true <--");
+ benchMarkHelper(1_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(10_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(100_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+ }
+
+ private enum WRITETYPE {PUT, WRITE_BATCH}
+
+ private void benchMarkHelper(int number, boolean disableWAL, WRITETYPE type) throws Exception {
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(number);
+ for (int i = 0; i < number; ++i) {
+ data.add(new Tuple2<>(("key key key key key key" + i).getBytes(),
+ ("value value value value value" + i).getBytes()));
+ }
+
+ System.out.print("number:" + number);
+ switch (type) {
+ case PUT:
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()))) {
+ long t1 = System.currentTimeMillis();
+ for (Tuple2<byte[], byte[]> item : data) {
+ db.put(handle, options, item.f0, item.f1);
+ }
+ long t2 = System.currentTimeMillis();
+ System.out.println(" put cost:" + (t2 - t1) + " ms");
--- End diff --
ditto
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r174231064
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void basicTest() throws Exception {
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
+ for (int i = 0; i < 10000; ++i) {
+ data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
+ }
+
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(true);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+ RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
+
+ // insert data
+ for (Tuple2<byte[], byte[]> item : data) {
+ writeBatchWrapper.put(handle, item.f0, item.f1);
+ }
+ writeBatchWrapper.flush();
+
+ // valid result
+ for (Tuple2<byte[], byte[]> item : data) {
+ Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
+ }
+ }
+ }
+
+ @Test
+ @Ignore
+ public void benchMark() throws Exception {
+
+ // put with disableWAL=true VS put with disableWAL=false
+ System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--");
+ benchMarkHelper(1_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(10_000, true, WRITETYPE.PUT);
+ benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(100_000, true, WRITETYPE.PUT);
+ benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+ // put with disableWAL=true VS write batch with disableWAL=false
+ System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--");
--- End diff --
replace console output with logging, you can refer to `RocksDBListStatePerformanceTest.java`
---
[GitHub] flink issue #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5650
A micro-benchmark for this: @StefanRRichter @StephanEwen
```
-----------------------> With disableWAL is false <-----------------
Number of values added | time for Put | time for WriteBach | performance improvement of WriteBatch over Put
1000 10146397 ns 3546287 ns 2.86x
10000 118227077 ns 26040222 ns 4.54x
100000 1838593196 ns 375053755 ns 4.9x
1000000 8844612079 ns 2014077396 ns 4.39x
-----------------------> With disableWAL is true <-----------------
1000 3955204 ns 2429725 ns 1.62x
10000 25618237 ns 16440113 ns 1.55x
100000 289153346 ns 183712685 ns 1.57x
1000000 2886298967 ns 1768688571 ns 1.63x
```
---
[GitHub] flink issue #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/5650
@sihuazhou thanks for this nice contribution. LGTM 👍 Will merge.
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r172935214
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+ private final static int MIN_CAPACITY = 100;
+ private final static int MAX_CAPACITY = 10000;
+
+ private final RocksDB db;
+
+ private final WriteBatch batch;
+
+ private final WriteOptions options;
+
+ private final int capacity;
+
+ private int currentSize;
+
+ public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+ @Nonnull WriteOptions options,
+ int capacity) {
+
+ Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
+ "capacity should at least greater than 100");
+
+ this.db = rocksDB;
+ this.options = options;
+ this.capacity = capacity;
+ this.batch = new WriteBatch(this.capacity);
+ this.currentSize = 0;
+ }
+
+ public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException {
--- End diff --
need synchronization on put() and flush()
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r174479455
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void basicTest() throws Exception {
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
+ for (int i = 0; i < 10000; ++i) {
+ data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
+ }
+
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(true);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+ RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
+
+ // insert data
+ for (Tuple2<byte[], byte[]> item : data) {
+ writeBatchWrapper.put(handle, item.f0, item.f1);
+ }
+ writeBatchWrapper.flush();
+
+ // valid result
+ for (Tuple2<byte[], byte[]> item : data) {
+ Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
+ }
+ }
+ }
+
+ @Test
+ @Ignore
+ public void benchMark() throws Exception {
+
+ // put with disableWAL=true VS put with disableWAL=false
+ System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--");
+ benchMarkHelper(1_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(10_000, true, WRITETYPE.PUT);
+ benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(100_000, true, WRITETYPE.PUT);
+ benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+ // put with disableWAL=true VS write batch with disableWAL=false
+ System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--");
+ benchMarkHelper(1_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(10_000, true, WRITETYPE.PUT);
+ benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(100_000, true, WRITETYPE.PUT);
+ benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+ // write batch with disableWAL=true VS write batch disableWAL = true
+ System.out.println("--> write batch with disableWAL=true VS write batch disableWAL = true <--");
+ benchMarkHelper(1_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(10_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(100_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+ }
+
+ private enum WRITETYPE {PUT, WRITE_BATCH}
+
+ private void benchMarkHelper(int number, boolean disableWAL, WRITETYPE type) throws Exception {
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(number);
+ for (int i = 0; i < number; ++i) {
+ data.add(new Tuple2<>(("key key key key key key" + i).getBytes(),
+ ("value value value value value" + i).getBytes()));
+ }
+
+ System.out.print("number:" + number);
+ switch (type) {
+ case PUT:
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()))) {
+ long t1 = System.currentTimeMillis();
+ for (Tuple2<byte[], byte[]> item : data) {
+ db.put(handle, options, item.f0, item.f1);
+ }
+ long t2 = System.currentTimeMillis();
+ System.out.println(" put cost:" + (t2 - t1) + " ms");
+ }
+ break;
+ case WRITE_BATCH:
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+ RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
+ long t1 = System.currentTimeMillis();
+ for (Tuple2<byte[], byte[]> item : data) {
+ writeBatchWrapper.put(handle, item.f0, item.f1);
+ }
+ writeBatchWrapper.flush();
+ long t2 = System.currentTimeMillis();
+ System.out.println(" write batch cost:" + (t2 - t1) + " ms");
--- End diff --
👍
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r174479344
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void basicTest() throws Exception {
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
+ for (int i = 0; i < 10000; ++i) {
+ data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
+ }
+
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(true);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+ RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
+
+ // insert data
+ for (Tuple2<byte[], byte[]> item : data) {
+ writeBatchWrapper.put(handle, item.f0, item.f1);
+ }
+ writeBatchWrapper.flush();
+
+ // valid result
+ for (Tuple2<byte[], byte[]> item : data) {
+ Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
+ }
+ }
+ }
+
+ @Test
+ @Ignore
+ public void benchMark() throws Exception {
+
+ // put with disableWAL=true VS put with disableWAL=false
+ System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--");
+ benchMarkHelper(1_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(10_000, true, WRITETYPE.PUT);
+ benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(100_000, true, WRITETYPE.PUT);
+ benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+ // put with disableWAL=true VS write batch with disableWAL=false
+ System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--");
--- End diff --
👍
---
[GitHub] flink issue #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5650
@bowenli86 Thanks a lot for you reviews, I've addressed all your comments, I'm waiting the 1.5 to be released, after that maybe @StefanRRichter could also have a look at this.
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r174479322
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void basicTest() throws Exception {
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
+ for (int i = 0; i < 10000; ++i) {
+ data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
+ }
+
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(true);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+ RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
+
+ // insert data
+ for (Tuple2<byte[], byte[]> item : data) {
+ writeBatchWrapper.put(handle, item.f0, item.f1);
+ }
+ writeBatchWrapper.flush();
+
+ // valid result
+ for (Tuple2<byte[], byte[]> item : data) {
+ Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
+ }
+ }
+ }
+
+ @Test
+ @Ignore
+ public void benchMark() throws Exception {
--- End diff --
👍
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r173048697
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+ private final static int MIN_CAPACITY = 100;
+ private final static int MAX_CAPACITY = 10000;
+
+ private final RocksDB db;
+
+ private final WriteBatch batch;
+
+ private final WriteOptions options;
+
+ private final int capacity;
+
+ private int currentSize;
+
+ public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+ @Nonnull WriteOptions options,
+ int capacity) {
+
+ Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
+ "capacity should at least greater than 100");
+
+ this.db = rocksDB;
+ this.options = options;
+ this.capacity = capacity;
+ this.batch = new WriteBatch(this.capacity);
+ this.currentSize = 0;
+ }
+
+ public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException {
--- End diff --
Hmm... currently, it is only used in single thread. For the best performance, I wouldn't like to add synchronization for it, I'd like to add annotation for this class that it's not thread safe. We could introduce a new class that is thread safe if we really need it. What do you think?
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r172934683
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+ private final static int MIN_CAPACITY = 100;
+ private final static int MAX_CAPACITY = 10000;
+
+ private final RocksDB db;
+
+ private final WriteBatch batch;
+
+ private final WriteOptions options;
+
+ private final int capacity;
+
+ private int currentSize;
+
+ public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+ @Nonnull WriteOptions options,
+ int capacity) {
+
+ Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
+ "capacity should at least greater than 100");
--- End diff --
how is the capacity range determined - is it recommended by RocksDB?
the msg should be: "capacity should be between " + MIN + " and " + MAX
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r174231173
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void basicTest() throws Exception {
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
+ for (int i = 0; i < 10000; ++i) {
+ data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
+ }
+
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(true);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+ RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
+
+ // insert data
+ for (Tuple2<byte[], byte[]> item : data) {
+ writeBatchWrapper.put(handle, item.f0, item.f1);
+ }
+ writeBatchWrapper.flush();
+
+ // valid result
+ for (Tuple2<byte[], byte[]> item : data) {
+ Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
+ }
+ }
+ }
+
+ @Test
+ @Ignore
+ public void benchMark() throws Exception {
+
+ // put with disableWAL=true VS put with disableWAL=false
+ System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--");
+ benchMarkHelper(1_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(10_000, true, WRITETYPE.PUT);
+ benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(100_000, true, WRITETYPE.PUT);
+ benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+ // put with disableWAL=true VS write batch with disableWAL=false
+ System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--");
+ benchMarkHelper(1_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(10_000, true, WRITETYPE.PUT);
+ benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(100_000, true, WRITETYPE.PUT);
+ benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+ // write batch with disableWAL=true VS write batch disableWAL = true
+ System.out.println("--> write batch with disableWAL=true VS write batch disableWAL = true <--");
+ benchMarkHelper(1_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(10_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(100_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+ }
+
+ private enum WRITETYPE {PUT, WRITE_BATCH}
+
+ private void benchMarkHelper(int number, boolean disableWAL, WRITETYPE type) throws Exception {
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(number);
+ for (int i = 0; i < number; ++i) {
+ data.add(new Tuple2<>(("key key key key key key" + i).getBytes(),
+ ("value value value value value" + i).getBytes()));
+ }
+
+ System.out.print("number:" + number);
+ switch (type) {
+ case PUT:
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()))) {
+ long t1 = System.currentTimeMillis();
+ for (Tuple2<byte[], byte[]> item : data) {
+ db.put(handle, options, item.f0, item.f1);
+ }
+ long t2 = System.currentTimeMillis();
+ System.out.println(" put cost:" + (t2 - t1) + " ms");
+ }
+ break;
+ case WRITE_BATCH:
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+ RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
+ long t1 = System.currentTimeMillis();
+ for (Tuple2<byte[], byte[]> item : data) {
+ writeBatchWrapper.put(handle, item.f0, item.f1);
+ }
+ writeBatchWrapper.flush();
+ long t2 = System.currentTimeMillis();
+ System.out.println(" write batch cost:" + (t2 - t1) + " ms");
--- End diff --
ditto
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r174230739
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void basicTest() throws Exception {
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
+ for (int i = 0; i < 10000; ++i) {
+ data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
+ }
+
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(true);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+ RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
+
+ // insert data
+ for (Tuple2<byte[], byte[]> item : data) {
+ writeBatchWrapper.put(handle, item.f0, item.f1);
+ }
+ writeBatchWrapper.flush();
+
+ // valid result
+ for (Tuple2<byte[], byte[]> item : data) {
+ Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
+ }
+ }
+ }
+
+ @Test
+ @Ignore
+ public void benchMark() throws Exception {
--- End diff --
Need to move the benchmark test to `org.apache.flink.contrib.streaming.state.benchmark` package.
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r174479415
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void basicTest() throws Exception {
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
+ for (int i = 0; i < 10000; ++i) {
+ data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
+ }
+
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(true);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
+ RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
+
+ // insert data
+ for (Tuple2<byte[], byte[]> item : data) {
+ writeBatchWrapper.put(handle, item.f0, item.f1);
+ }
+ writeBatchWrapper.flush();
+
+ // valid result
+ for (Tuple2<byte[], byte[]> item : data) {
+ Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
+ }
+ }
+ }
+
+ @Test
+ @Ignore
+ public void benchMark() throws Exception {
+
+ // put with disableWAL=true VS put with disableWAL=false
+ System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--");
+ benchMarkHelper(1_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(10_000, true, WRITETYPE.PUT);
+ benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(100_000, true, WRITETYPE.PUT);
+ benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+ // put with disableWAL=true VS write batch with disableWAL=false
+ System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--");
+ benchMarkHelper(1_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(10_000, true, WRITETYPE.PUT);
+ benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(100_000, true, WRITETYPE.PUT);
+ benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+ benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+ // write batch with disableWAL=true VS write batch disableWAL = true
+ System.out.println("--> write batch with disableWAL=true VS write batch disableWAL = true <--");
+ benchMarkHelper(1_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(10_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(100_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+ benchMarkHelper(1_000_000, true, WRITETYPE.WRITE_BATCH);
+ benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+ }
+
+ private enum WRITETYPE {PUT, WRITE_BATCH}
+
+ private void benchMarkHelper(int number, boolean disableWAL, WRITETYPE type) throws Exception {
+
+ List<Tuple2<byte[], byte[]>> data = new ArrayList<>(number);
+ for (int i = 0; i < number; ++i) {
+ data.add(new Tuple2<>(("key key key key key key" + i).getBytes(),
+ ("value value value value value" + i).getBytes()));
+ }
+
+ System.out.print("number:" + number);
+ switch (type) {
+ case PUT:
+ try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
+ WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
+ ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()))) {
+ long t1 = System.currentTimeMillis();
+ for (Tuple2<byte[], byte[]> item : data) {
+ db.put(handle, options, item.f0, item.f1);
+ }
+ long t2 = System.currentTimeMillis();
+ System.out.println(" put cost:" + (t2 - t1) + " ms");
--- End diff --
👍
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5650#discussion_r173049537
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A wrapper class to wrap WriteBatch.
+ */
+public class RocksDBWriteBatchWrapper implements AutoCloseable {
+
+ private final static int MIN_CAPACITY = 100;
+ private final static int MAX_CAPACITY = 10000;
+
+ private final RocksDB db;
+
+ private final WriteBatch batch;
+
+ private final WriteOptions options;
+
+ private final int capacity;
+
+ private int currentSize;
+
+ public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
+ @Nonnull WriteOptions options,
+ int capacity) {
+
+ Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
+ "capacity should at least greater than 100");
--- End diff --
About the capacity range, I didn't find a specific value recommend by RocksDB, but from [FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ)
```
Q: What's the fastest way to load data into RocksDB?
...
2. batch hundreds of keys into one write batch
...
```
I found that they use the word `hundreds`.
---
[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5650
---