You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sihuazhou <gi...@git.apache.org> on 2018/03/07 09:21:01 UTC

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

GitHub user sihuazhou opened a pull request:

    https://github.com/apache/flink/pull/5650

    [FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to improve performance for recovery in RocksDB backend

    ## What is the purpose of the change
    
    This PR addresses [FLINK-8845](https://issues.apache.org/jira/browse/FLINK-8845), which attempts to use `WriteBatch` to improve the performance for loading data into RocksDB. It's inspired by [RocksDB FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ).
    
    ## Brief change log
    
      - *Introduce `RocksDBWriteBatchWrapper` to load data into RocksDB in bulk*
    
    ## Verifying this change
    
    - Introduce `RocksDBWriteBatchWrapperTest.java` to guard `RocksDBWriteBatchWrapper`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    none


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sihuazhou/flink rocksdb_write_batch

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5650.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5650
    
----
commit e710287495d2a1a12a99b812c9691e12c6c57459
Author: sihuazhou <su...@...>
Date:   2018-03-07T05:58:45Z

    Introduce RocksDBWriteBatchWrapper to speed up write performance.

----


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r173048763
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +
    +import javax.annotation.Nonnull;
    +
    +/**
    + * A wrapper class to wrap WriteBatch.
    + */
    +public class RocksDBWriteBatchWrapper implements AutoCloseable {
    +
    +	private final static int MIN_CAPACITY = 100;
    +	private final static int MAX_CAPACITY = 10000;
    +
    +	private final RocksDB db;
    +
    +	private final WriteBatch batch;
    +
    +	private final WriteOptions options;
    +
    +	private final int capacity;
    +
    +	private int currentSize;
    +
    +	public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
    +									@Nonnull WriteOptions options,
    +									int capacity) {
    +
    +		Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
    +			"capacity should at least greater than 100");
    +
    +		this.db = rocksDB;
    +		this.options = options;
    +		this.capacity = capacity;
    +		this.batch = new WriteBatch(this.capacity);
    +		this.currentSize = 0;
    +	}
    +
    +	public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException {
    +
    +		this.batch.put(handle, key, value);
    +
    +		if (++currentSize == capacity) {
    +			flush();
    +		}
    +	}
    +
    +	public void flush() throws RocksDBException {
    +		this.db.write(options, batch);
    +		batch.clear();
    +		currentSize = 0;
    +	}
    +
    +	@Override
    +	public void close() throws RocksDBException {
    +		if (batch != null) {
    --- End diff --
    
    You are right, this `if` can be removed.


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r172935414
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +
    +import javax.annotation.Nonnull;
    +
    +/**
    + * A wrapper class to wrap WriteBatch.
    + */
    +public class RocksDBWriteBatchWrapper implements AutoCloseable {
    +
    +	private final static int MIN_CAPACITY = 100;
    +	private final static int MAX_CAPACITY = 10000;
    +
    +	private final RocksDB db;
    +
    +	private final WriteBatch batch;
    +
    +	private final WriteOptions options;
    +
    +	private final int capacity;
    +
    +	private int currentSize;
    +
    +	public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
    +									@Nonnull WriteOptions options,
    +									int capacity) {
    +
    +		Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
    +			"capacity should at least greater than 100");
    +
    +		this.db = rocksDB;
    +		this.options = options;
    +		this.capacity = capacity;
    +		this.batch = new WriteBatch(this.capacity);
    +		this.currentSize = 0;
    +	}
    +
    +	public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException {
    +
    +		this.batch.put(handle, key, value);
    +
    +		if (++currentSize == capacity) {
    +			flush();
    +		}
    +	}
    +
    +	public void flush() throws RocksDBException {
    +		this.db.write(options, batch);
    +		batch.clear();
    +		currentSize = 0;
    +	}
    +
    +	@Override
    +	public void close() throws RocksDBException {
    +		if (batch != null) {
    --- End diff --
    
    can batch be null?


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r174231142
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +import org.junit.Assert;
    +import org.junit.Ignore;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.WriteOptions;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Tests to guard {@link RocksDBWriteBatchWrapper}.
    + */
    +public class RocksDBWriteBatchWrapperTest {
    +
    +	@Rule
    +	public TemporaryFolder folder = new TemporaryFolder();
    +
    +	@Test
    +	public void basicTest() throws Exception {
    +
    +		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
    +		for (int i = 0; i < 10000; ++i) {
    +			data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
    +		}
    +
    +		try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +			WriteOptions options = new WriteOptions().setDisableWAL(true);
    +			ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
    +			RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
    +
    +			// insert data
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				writeBatchWrapper.put(handle, item.f0, item.f1);
    +			}
    +			writeBatchWrapper.flush();
    +
    +			// valid result
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
    +			}
    +		}
    +	}
    +
    +	@Test
    +	@Ignore
    +	public void benchMark() throws Exception {
    +
    +		// put with disableWAL=true VS put with disableWAL=false
    +		System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(10_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(100_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
    +
    +		// put with disableWAL=true VS write batch with disableWAL=false
    +		System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		// write batch with disableWAL=true VS write batch disableWAL = true
    +		System.out.println("--> write batch with disableWAL=true VS write batch disableWAL = true <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
    +	}
    +
    +	private enum WRITETYPE {PUT, WRITE_BATCH}
    +
    +	private void benchMarkHelper(int number, boolean disableWAL, WRITETYPE type) throws Exception {
    +
    +		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(number);
    +		for (int i = 0; i < number; ++i) {
    +			data.add(new Tuple2<>(("key key key key key key" + i).getBytes(),
    +				("value value value value value" + i).getBytes()));
    +		}
    +
    +		System.out.print("number:" + number);
    +		switch (type) {
    +			case PUT:
    +				try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +					WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
    +					ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()))) {
    +					long t1 = System.currentTimeMillis();
    +					for (Tuple2<byte[], byte[]> item : data) {
    +						db.put(handle, options, item.f0, item.f1);
    +					}
    +					long t2 = System.currentTimeMillis();
    +					System.out.println(" put cost:" + (t2 - t1) + " ms");
    --- End diff --
    
    ditto


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r174231064
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +import org.junit.Assert;
    +import org.junit.Ignore;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.WriteOptions;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Tests to guard {@link RocksDBWriteBatchWrapper}.
    + */
    +public class RocksDBWriteBatchWrapperTest {
    +
    +	@Rule
    +	public TemporaryFolder folder = new TemporaryFolder();
    +
    +	@Test
    +	public void basicTest() throws Exception {
    +
    +		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
    +		for (int i = 0; i < 10000; ++i) {
    +			data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
    +		}
    +
    +		try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +			WriteOptions options = new WriteOptions().setDisableWAL(true);
    +			ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
    +			RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
    +
    +			// insert data
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				writeBatchWrapper.put(handle, item.f0, item.f1);
    +			}
    +			writeBatchWrapper.flush();
    +
    +			// valid result
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
    +			}
    +		}
    +	}
    +
    +	@Test
    +	@Ignore
    +	public void benchMark() throws Exception {
    +
    +		// put with disableWAL=true VS put with disableWAL=false
    +		System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(10_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(100_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
    +
    +		// put with disableWAL=true VS write batch with disableWAL=false
    +		System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--");
    --- End diff --
    
    replace console output with logging, you can refer to `RocksDBListStatePerformanceTest.java`


---

[GitHub] flink issue #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5650
  
    A micro-benchmark for this: @StefanRRichter @StephanEwen 
    ```
    -----------------------> With disableWAL is false <-----------------
    Number of values added | time for Put		|  time for WriteBach | performance improvement of WriteBatch over Put
    1000			10146397 ns		3546287 ns			2.86x
    10000			118227077 ns		26040222 ns			4.54x
    100000			1838593196 ns		375053755 ns			4.9x
    1000000			8844612079 ns		2014077396 ns			4.39x
    
    -----------------------> With disableWAL is true <-----------------
    1000			3955204 ns		2429725 ns			1.62x
    10000			25618237 ns		16440113 ns			1.55x
    100000			289153346 ns		183712685 ns			1.57x
    1000000			2886298967 ns		1768688571 ns			1.63x
    ```


---

[GitHub] flink issue #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5650
  
    @sihuazhou thanks for this nice contribution. LGTM 👍 Will merge.


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r172935214
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +
    +import javax.annotation.Nonnull;
    +
    +/**
    + * A wrapper class to wrap WriteBatch.
    + */
    +public class RocksDBWriteBatchWrapper implements AutoCloseable {
    +
    +	private final static int MIN_CAPACITY = 100;
    +	private final static int MAX_CAPACITY = 10000;
    +
    +	private final RocksDB db;
    +
    +	private final WriteBatch batch;
    +
    +	private final WriteOptions options;
    +
    +	private final int capacity;
    +
    +	private int currentSize;
    +
    +	public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
    +									@Nonnull WriteOptions options,
    +									int capacity) {
    +
    +		Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
    +			"capacity should at least greater than 100");
    +
    +		this.db = rocksDB;
    +		this.options = options;
    +		this.capacity = capacity;
    +		this.batch = new WriteBatch(this.capacity);
    +		this.currentSize = 0;
    +	}
    +
    +	public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException {
    --- End diff --
    
    need synchronization on put() and flush()


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r174479455
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +import org.junit.Assert;
    +import org.junit.Ignore;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.WriteOptions;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Tests to guard {@link RocksDBWriteBatchWrapper}.
    + */
    +public class RocksDBWriteBatchWrapperTest {
    +
    +	@Rule
    +	public TemporaryFolder folder = new TemporaryFolder();
    +
    +	@Test
    +	public void basicTest() throws Exception {
    +
    +		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
    +		for (int i = 0; i < 10000; ++i) {
    +			data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
    +		}
    +
    +		try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +			WriteOptions options = new WriteOptions().setDisableWAL(true);
    +			ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
    +			RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
    +
    +			// insert data
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				writeBatchWrapper.put(handle, item.f0, item.f1);
    +			}
    +			writeBatchWrapper.flush();
    +
    +			// valid result
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
    +			}
    +		}
    +	}
    +
    +	@Test
    +	@Ignore
    +	public void benchMark() throws Exception {
    +
    +		// put with disableWAL=true VS put with disableWAL=false
    +		System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(10_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(100_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
    +
    +		// put with disableWAL=true VS write batch with disableWAL=false
    +		System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		// write batch with disableWAL=true VS write batch disableWAL = true
    +		System.out.println("--> write batch with disableWAL=true VS write batch disableWAL = true <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
    +	}
    +
    +	private enum WRITETYPE {PUT, WRITE_BATCH}
    +
    +	private void benchMarkHelper(int number, boolean disableWAL, WRITETYPE type) throws Exception {
    +
    +		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(number);
    +		for (int i = 0; i < number; ++i) {
    +			data.add(new Tuple2<>(("key key key key key key" + i).getBytes(),
    +				("value value value value value" + i).getBytes()));
    +		}
    +
    +		System.out.print("number:" + number);
    +		switch (type) {
    +			case PUT:
    +				try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +					WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
    +					ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()))) {
    +					long t1 = System.currentTimeMillis();
    +					for (Tuple2<byte[], byte[]> item : data) {
    +						db.put(handle, options, item.f0, item.f1);
    +					}
    +					long t2 = System.currentTimeMillis();
    +					System.out.println(" put cost:" + (t2 - t1) + " ms");
    +				}
    +				break;
    +			case WRITE_BATCH:
    +				try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +					WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
    +					ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
    +					RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
    +					long t1 = System.currentTimeMillis();
    +					for (Tuple2<byte[], byte[]> item : data) {
    +						writeBatchWrapper.put(handle, item.f0, item.f1);
    +					}
    +					writeBatchWrapper.flush();
    +					long t2 = System.currentTimeMillis();
    +					System.out.println(" write batch cost:" + (t2 - t1) + " ms");
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r174479344
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +import org.junit.Assert;
    +import org.junit.Ignore;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.WriteOptions;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Tests to guard {@link RocksDBWriteBatchWrapper}.
    + */
    +public class RocksDBWriteBatchWrapperTest {
    +
    +	@Rule
    +	public TemporaryFolder folder = new TemporaryFolder();
    +
    +	@Test
    +	public void basicTest() throws Exception {
    +
    +		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
    +		for (int i = 0; i < 10000; ++i) {
    +			data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
    +		}
    +
    +		try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +			WriteOptions options = new WriteOptions().setDisableWAL(true);
    +			ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
    +			RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
    +
    +			// insert data
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				writeBatchWrapper.put(handle, item.f0, item.f1);
    +			}
    +			writeBatchWrapper.flush();
    +
    +			// valid result
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
    +			}
    +		}
    +	}
    +
    +	@Test
    +	@Ignore
    +	public void benchMark() throws Exception {
    +
    +		// put with disableWAL=true VS put with disableWAL=false
    +		System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(10_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(100_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
    +
    +		// put with disableWAL=true VS write batch with disableWAL=false
    +		System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--");
    --- End diff --
    
    👍 


---

[GitHub] flink issue #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5650
  
    @bowenli86 Thanks a lot for you reviews, I've addressed all your comments, I'm waiting the 1.5 to be released, after that maybe @StefanRRichter could also have a look at this.


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r174479322
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +import org.junit.Assert;
    +import org.junit.Ignore;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.WriteOptions;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Tests to guard {@link RocksDBWriteBatchWrapper}.
    + */
    +public class RocksDBWriteBatchWrapperTest {
    +
    +	@Rule
    +	public TemporaryFolder folder = new TemporaryFolder();
    +
    +	@Test
    +	public void basicTest() throws Exception {
    +
    +		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
    +		for (int i = 0; i < 10000; ++i) {
    +			data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
    +		}
    +
    +		try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +			WriteOptions options = new WriteOptions().setDisableWAL(true);
    +			ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
    +			RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
    +
    +			// insert data
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				writeBatchWrapper.put(handle, item.f0, item.f1);
    +			}
    +			writeBatchWrapper.flush();
    +
    +			// valid result
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
    +			}
    +		}
    +	}
    +
    +	@Test
    +	@Ignore
    +	public void benchMark() throws Exception {
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r173048697
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +
    +import javax.annotation.Nonnull;
    +
    +/**
    + * A wrapper class to wrap WriteBatch.
    + */
    +public class RocksDBWriteBatchWrapper implements AutoCloseable {
    +
    +	private final static int MIN_CAPACITY = 100;
    +	private final static int MAX_CAPACITY = 10000;
    +
    +	private final RocksDB db;
    +
    +	private final WriteBatch batch;
    +
    +	private final WriteOptions options;
    +
    +	private final int capacity;
    +
    +	private int currentSize;
    +
    +	public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
    +									@Nonnull WriteOptions options,
    +									int capacity) {
    +
    +		Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
    +			"capacity should at least greater than 100");
    +
    +		this.db = rocksDB;
    +		this.options = options;
    +		this.capacity = capacity;
    +		this.batch = new WriteBatch(this.capacity);
    +		this.currentSize = 0;
    +	}
    +
    +	public void put(ColumnFamilyHandle handle, byte[] key, byte[] value) throws RocksDBException {
    --- End diff --
    
    Hmm... currently, it is only used in single thread. For the best performance, I wouldn't like to add synchronization for it, I'd like to add annotation for this class that it's not thread safe. We could introduce a new class that is thread safe if we really need it. What do you think? 


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r172934683
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +
    +import javax.annotation.Nonnull;
    +
    +/**
    + * A wrapper class to wrap WriteBatch.
    + */
    +public class RocksDBWriteBatchWrapper implements AutoCloseable {
    +
    +	private final static int MIN_CAPACITY = 100;
    +	private final static int MAX_CAPACITY = 10000;
    +
    +	private final RocksDB db;
    +
    +	private final WriteBatch batch;
    +
    +	private final WriteOptions options;
    +
    +	private final int capacity;
    +
    +	private int currentSize;
    +
    +	public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
    +									@Nonnull WriteOptions options,
    +									int capacity) {
    +
    +		Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
    +			"capacity should at least greater than 100");
    --- End diff --
    
    how is the capacity range determined - is it recommended by RocksDB?
    
    the msg should be: "capacity should be between " + MIN + " and " + MAX


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r174231173
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +import org.junit.Assert;
    +import org.junit.Ignore;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.WriteOptions;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Tests to guard {@link RocksDBWriteBatchWrapper}.
    + */
    +public class RocksDBWriteBatchWrapperTest {
    +
    +	@Rule
    +	public TemporaryFolder folder = new TemporaryFolder();
    +
    +	@Test
    +	public void basicTest() throws Exception {
    +
    +		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
    +		for (int i = 0; i < 10000; ++i) {
    +			data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
    +		}
    +
    +		try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +			WriteOptions options = new WriteOptions().setDisableWAL(true);
    +			ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
    +			RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
    +
    +			// insert data
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				writeBatchWrapper.put(handle, item.f0, item.f1);
    +			}
    +			writeBatchWrapper.flush();
    +
    +			// valid result
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
    +			}
    +		}
    +	}
    +
    +	@Test
    +	@Ignore
    +	public void benchMark() throws Exception {
    +
    +		// put with disableWAL=true VS put with disableWAL=false
    +		System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(10_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(100_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
    +
    +		// put with disableWAL=true VS write batch with disableWAL=false
    +		System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		// write batch with disableWAL=true VS write batch disableWAL = true
    +		System.out.println("--> write batch with disableWAL=true VS write batch disableWAL = true <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
    +	}
    +
    +	private enum WRITETYPE {PUT, WRITE_BATCH}
    +
    +	private void benchMarkHelper(int number, boolean disableWAL, WRITETYPE type) throws Exception {
    +
    +		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(number);
    +		for (int i = 0; i < number; ++i) {
    +			data.add(new Tuple2<>(("key key key key key key" + i).getBytes(),
    +				("value value value value value" + i).getBytes()));
    +		}
    +
    +		System.out.print("number:" + number);
    +		switch (type) {
    +			case PUT:
    +				try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +					WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
    +					ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()))) {
    +					long t1 = System.currentTimeMillis();
    +					for (Tuple2<byte[], byte[]> item : data) {
    +						db.put(handle, options, item.f0, item.f1);
    +					}
    +					long t2 = System.currentTimeMillis();
    +					System.out.println(" put cost:" + (t2 - t1) + " ms");
    +				}
    +				break;
    +			case WRITE_BATCH:
    +				try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +					WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
    +					ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
    +					RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
    +					long t1 = System.currentTimeMillis();
    +					for (Tuple2<byte[], byte[]> item : data) {
    +						writeBatchWrapper.put(handle, item.f0, item.f1);
    +					}
    +					writeBatchWrapper.flush();
    +					long t2 = System.currentTimeMillis();
    +					System.out.println(" write batch cost:" + (t2 - t1) + " ms");
    --- End diff --
    
    ditto


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r174230739
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +import org.junit.Assert;
    +import org.junit.Ignore;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.WriteOptions;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Tests to guard {@link RocksDBWriteBatchWrapper}.
    + */
    +public class RocksDBWriteBatchWrapperTest {
    +
    +	@Rule
    +	public TemporaryFolder folder = new TemporaryFolder();
    +
    +	@Test
    +	public void basicTest() throws Exception {
    +
    +		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
    +		for (int i = 0; i < 10000; ++i) {
    +			data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
    +		}
    +
    +		try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +			WriteOptions options = new WriteOptions().setDisableWAL(true);
    +			ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
    +			RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
    +
    +			// insert data
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				writeBatchWrapper.put(handle, item.f0, item.f1);
    +			}
    +			writeBatchWrapper.flush();
    +
    +			// valid result
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
    +			}
    +		}
    +	}
    +
    +	@Test
    +	@Ignore
    +	public void benchMark() throws Exception {
    --- End diff --
    
    Need to move the benchmark test to `org.apache.flink.contrib.streaming.state.benchmark` package.


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r174479415
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java ---
    @@ -0,0 +1,159 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +import org.junit.Assert;
    +import org.junit.Ignore;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.rocksdb.ColumnFamilyDescriptor;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.WriteOptions;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Tests to guard {@link RocksDBWriteBatchWrapper}.
    + */
    +public class RocksDBWriteBatchWrapperTest {
    +
    +	@Rule
    +	public TemporaryFolder folder = new TemporaryFolder();
    +
    +	@Test
    +	public void basicTest() throws Exception {
    +
    +		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(10000);
    +		for (int i = 0; i < 10000; ++i) {
    +			data.add(new Tuple2<>(("key:" + i).getBytes(), ("value:" + i).getBytes()));
    +		}
    +
    +		try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +			WriteOptions options = new WriteOptions().setDisableWAL(true);
    +			ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()));
    +			RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, options, 200)) {
    +
    +			// insert data
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				writeBatchWrapper.put(handle, item.f0, item.f1);
    +			}
    +			writeBatchWrapper.flush();
    +
    +			// valid result
    +			for (Tuple2<byte[], byte[]> item : data) {
    +				Assert.assertArrayEquals(item.f1, db.get(handle, item.f0));
    +			}
    +		}
    +	}
    +
    +	@Test
    +	@Ignore
    +	public void benchMark() throws Exception {
    +
    +		// put with disableWAL=true VS put with disableWAL=false
    +		System.out.println("--> put with disableWAL=true VS put with disableWAL=false <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(10_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(100_000, false, WRITETYPE.PUT);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
    +
    +		// put with disableWAL=true VS write batch with disableWAL=false
    +		System.out.println("--> put with disableWAL=true VS write batch with disableWAL=false <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		// write batch with disableWAL=true VS write batch disableWAL = true
    +		System.out.println("--> write batch with disableWAL=true VS write batch disableWAL = true <--");
    +		benchMarkHelper(1_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(10_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(100_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
    +
    +		benchMarkHelper(1_000_000, true, WRITETYPE.WRITE_BATCH);
    +		benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
    +	}
    +
    +	private enum WRITETYPE {PUT, WRITE_BATCH}
    +
    +	private void benchMarkHelper(int number, boolean disableWAL, WRITETYPE type) throws Exception {
    +
    +		List<Tuple2<byte[], byte[]>> data = new ArrayList<>(number);
    +		for (int i = 0; i < number; ++i) {
    +			data.add(new Tuple2<>(("key key key key key key" + i).getBytes(),
    +				("value value value value value" + i).getBytes()));
    +		}
    +
    +		System.out.print("number:" + number);
    +		switch (type) {
    +			case PUT:
    +				try (RocksDB db = RocksDB.open(folder.newFolder().getAbsolutePath());
    +					WriteOptions options = new WriteOptions().setDisableWAL(disableWAL);
    +					ColumnFamilyHandle handle = db.createColumnFamily(new ColumnFamilyDescriptor("test".getBytes()))) {
    +					long t1 = System.currentTimeMillis();
    +					for (Tuple2<byte[], byte[]> item : data) {
    +						db.put(handle, options, item.f0, item.f1);
    +					}
    +					long t2 = System.currentTimeMillis();
    +					System.out.println(" put cost:" + (t2 - t1) + " ms");
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5650#discussion_r173049537
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.util.Preconditions;
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +
    +import javax.annotation.Nonnull;
    +
    +/**
    + * A wrapper class to wrap WriteBatch.
    + */
    +public class RocksDBWriteBatchWrapper implements AutoCloseable {
    +
    +	private final static int MIN_CAPACITY = 100;
    +	private final static int MAX_CAPACITY = 10000;
    +
    +	private final RocksDB db;
    +
    +	private final WriteBatch batch;
    +
    +	private final WriteOptions options;
    +
    +	private final int capacity;
    +
    +	private int currentSize;
    +
    +	public RocksDBWriteBatchWrapper(@Nonnull RocksDB rocksDB,
    +									@Nonnull WriteOptions options,
    +									int capacity) {
    +
    +		Preconditions.checkArgument(capacity >= MIN_CAPACITY && capacity <= MAX_CAPACITY,
    +			"capacity should at least greater than 100");
    --- End diff --
    
    About the capacity range, I didn't find a specific value recommend by RocksDB, but from [FAQ](https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ)
    ```
    Q: What's the fastest way to load data into RocksDB?
    ...
    2. batch hundreds of keys into one write batch
    ...
    ```
    I found that they use the word `hundreds`.


---

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5650


---