You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/23 10:08:08 UTC

flink git commit: [FLINK-8441] [State Backend] [RocksDB] change RocksDBListState to serialize values and separators in stream to avoid extra bytes copying

Repository: flink
Updated Branches:
  refs/heads/master a2533f406 -> ce25688ba


[FLINK-8441] [State Backend] [RocksDB] change RocksDBListState to serialize values and separators in stream to avoid extra bytes copying

This closes #5323.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce25688b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce25688b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce25688b

Branch: refs/heads/master
Commit: ce25688bac1c1ecfa53a03ab1857bb82963b0696
Parents: a2533f4
Author: Bowen Li <bo...@gmail.com>
Authored: Tue Jan 2 11:21:28 2018 -0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jan 23 11:07:06 2018 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBListState.java       | 18 ++++--
 .../streaming/state/util/MergeUtils.java        | 63 --------------------
 .../RocksDBListStatePerformanceTest.java        | 37 +++++++++++-
 .../streaming/state/util/MergeUtilsTest.java    | 57 ------------------
 4 files changed, 48 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ce25688b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 950058b..13f8d81 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -21,7 +21,6 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.contrib.streaming.state.util.MergeUtils;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -61,6 +60,11 @@ public class RocksDBListState<K, N, V>
 	private final WriteOptions writeOptions;
 
 	/**
+	 * Separator of StringAppendTestOperator in RocksDB.
+	 */
+	private static final byte DELIMITER = ',';
+
+	/**
 	 * Creates a new {@code RocksDBListState}.
 	 *
 	 * @param namespaceSerializer The serializer for the namespace.
@@ -202,13 +206,17 @@ public class RocksDBListState<K, N, V>
 	private byte[] getPreMergedValue(List<V> values) throws IOException {
 		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
 
-		List<byte[]> bytes = new ArrayList<>(values.size());
+		keySerializationStream.reset();
+		boolean first = true;
 		for (V value : values) {
-			keySerializationStream.reset();
+			if (first) {
+				first = false;
+			} else {
+				keySerializationStream.write(DELIMITER);
+			}
 			valueSerializer.serialize(value, out);
-			bytes.add(keySerializationStream.toByteArray());
 		}
 
-		return MergeUtils.merge(bytes);
+		return keySerializationStream.toByteArray();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce25688b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java
deleted file mode 100644
index 6cf2781..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.util;
-
-import org.apache.flink.annotation.VisibleForTesting;
-
-import java.util.List;
-
-/**
- * Utils to simulate StringAppendTestOperator's merge operations in RocksDB.
- */
-public class MergeUtils {
-	@VisibleForTesting
-	protected static final byte DELIMITER = ',';
-
-	/**
-	 * Merge operands into a single value that can be put directly into RocksDB.
-	 */
-	public static byte[] merge(List<byte[]> operands) {
-		if (operands == null || operands.size() == 0) {
-			return null;
-		}
-
-		if (operands.size() == 1) {
-			return operands.get(0);
-		}
-
-		int numBytes = 0;
-		for (byte[] arr : operands) {
-			numBytes += arr.length + 1;
-		}
-		numBytes--;
-
-		byte[] result = new byte[numBytes];
-
-		System.arraycopy(operands.get(0), 0, result, 0, operands.get(0).length);
-
-		for (int i = 1, arrIndex = operands.get(0).length; i < operands.size(); i++) {
-			result[arrIndex] = DELIMITER;
-			arrIndex += 1;
-			System.arraycopy(operands.get(i), 0, result, arrIndex, operands.get(i).length);
-			arrIndex += operands.get(i).length;
-		}
-
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce25688b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
index 153584f..670c355 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.contrib.streaming.state.benchmark;
 
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
-import org.apache.flink.contrib.streaming.state.util.MergeUtils;
 import org.apache.flink.testutils.junit.RetryOnFailure;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.TestLogger;
@@ -63,6 +62,8 @@ import java.util.List;
  */
 public class RocksDBListStatePerformanceTest extends TestLogger {
 
+	private static final byte DELIMITER = ',';
+
 	@Rule
 	public final TemporaryFolder tmp = new TemporaryFolder();
 
@@ -121,7 +122,7 @@ public class RocksDBListStatePerformanceTest extends TestLogger {
 			for (int i = 0; i < num; i++) {
 				list.add(valueBytes);
 			}
-			byte[] premerged = MergeUtils.merge(list);
+			byte[] premerged = merge(list);
 
 			log.info("begin update");
 
@@ -132,4 +133,36 @@ public class RocksDBListStatePerformanceTest extends TestLogger {
 			log.info("end update - duration: {} ns", (endInsert2 - beginInsert2));
 		}
 	}
+
+	/**
+	 * Merge operands into a single value that can be put directly into RocksDB.
+	 */
+	public static byte[] merge(List<byte[]> operands) {
+		if (operands == null || operands.size() == 0) {
+			return null;
+		}
+
+		if (operands.size() == 1) {
+			return operands.get(0);
+		}
+
+		int numBytes = 0;
+		for (byte[] arr : operands) {
+			numBytes += arr.length + 1;
+		}
+		numBytes--;
+
+		byte[] result = new byte[numBytes];
+
+		System.arraycopy(operands.get(0), 0, result, 0, operands.get(0).length);
+
+		for (int i = 1, arrIndex = operands.get(0).length; i < operands.size(); i++) {
+			result[arrIndex] = DELIMITER;
+			arrIndex += 1;
+			System.arraycopy(operands.get(i), 0, result, arrIndex, operands.get(i).length);
+			arrIndex += operands.get(i).length;
+		}
+
+		return result;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce25688b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java
deleted file mode 100644
index 3493f76..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.util;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for MergeUtils.
- */
-public class MergeUtilsTest {
-
-	@Test
-	public void testMergeMulti() {
-		List<byte[]> list = Arrays.asList(
-			new byte[]{0, 1, 2, 3},
-			new byte[]{4},
-			new byte[]{5, 6});
-
-		byte[] expected = new byte[]{0, 1, 2, 3, MergeUtils.DELIMITER, 4, MergeUtils.DELIMITER, 5, 6};
-		assertTrue(Arrays.equals(expected, MergeUtils.merge(list)));
-	}
-
-	@Test
-	public void testMergeEmptyList() {
-		// Empty list
-		assertTrue(Arrays.equals(null, MergeUtils.merge(Collections.emptyList())));
-	}
-
-	@Test
-	public void testMergeSingleton() {
-		// Singleton list
-		byte[] singletonData = new byte[] {0x42};
-		assertTrue(Arrays.equals(singletonData, MergeUtils.merge(Arrays.asList(singletonData))));
-	}
-}