You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/23 10:08:08 UTC
flink git commit: [FLINK-8441] [State Backend] [RocksDB] change
RocksDBListState to serialize values and separators in stream to avoid extra
bytes copying
Repository: flink
Updated Branches:
refs/heads/master a2533f406 -> ce25688ba
[FLINK-8441] [State Backend] [RocksDB] change RocksDBListState to serialize values and separators in stream to avoid extra bytes copying
This closes #5323.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce25688b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce25688b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce25688b
Branch: refs/heads/master
Commit: ce25688bac1c1ecfa53a03ab1857bb82963b0696
Parents: a2533f4
Author: Bowen Li <bo...@gmail.com>
Authored: Tue Jan 2 11:21:28 2018 -0800
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jan 23 11:07:06 2018 +0100
----------------------------------------------------------------------
.../streaming/state/RocksDBListState.java | 18 ++++--
.../streaming/state/util/MergeUtils.java | 63 --------------------
.../RocksDBListStatePerformanceTest.java | 37 +++++++++++-
.../streaming/state/util/MergeUtilsTest.java | 57 ------------------
4 files changed, 48 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ce25688b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 950058b..13f8d81 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -21,7 +21,6 @@ package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.contrib.streaming.state.util.MergeUtils;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.internal.InternalListState;
@@ -61,6 +60,11 @@ public class RocksDBListState<K, N, V>
private final WriteOptions writeOptions;
/**
+ * Separator of StringAppendTestOperator in RocksDB.
+ */
+ private static final byte DELIMITER = ',';
+
+ /**
* Creates a new {@code RocksDBListState}.
*
* @param namespaceSerializer The serializer for the namespace.
@@ -202,13 +206,17 @@ public class RocksDBListState<K, N, V>
private byte[] getPreMergedValue(List<V> values) throws IOException {
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
- List<byte[]> bytes = new ArrayList<>(values.size());
+ keySerializationStream.reset();
+ boolean first = true;
for (V value : values) {
- keySerializationStream.reset();
+ if (first) {
+ first = false;
+ } else {
+ keySerializationStream.write(DELIMITER);
+ }
valueSerializer.serialize(value, out);
- bytes.add(keySerializationStream.toByteArray());
}
- return MergeUtils.merge(bytes);
+ return keySerializationStream.toByteArray();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce25688b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java
deleted file mode 100644
index 6cf2781..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/util/MergeUtils.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.util;
-
-import org.apache.flink.annotation.VisibleForTesting;
-
-import java.util.List;
-
-/**
- * Utils to simulate StringAppendTestOperator's merge operations in RocksDB.
- */
-public class MergeUtils {
- @VisibleForTesting
- protected static final byte DELIMITER = ',';
-
- /**
- * Merge operands into a single value that can be put directly into RocksDB.
- */
- public static byte[] merge(List<byte[]> operands) {
- if (operands == null || operands.size() == 0) {
- return null;
- }
-
- if (operands.size() == 1) {
- return operands.get(0);
- }
-
- int numBytes = 0;
- for (byte[] arr : operands) {
- numBytes += arr.length + 1;
- }
- numBytes--;
-
- byte[] result = new byte[numBytes];
-
- System.arraycopy(operands.get(0), 0, result, 0, operands.get(0).length);
-
- for (int i = 1, arrIndex = operands.get(0).length; i < operands.size(); i++) {
- result[arrIndex] = DELIMITER;
- arrIndex += 1;
- System.arraycopy(operands.get(i), 0, result, arrIndex, operands.get(i).length);
- arrIndex += operands.get(i).length;
- }
-
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce25688b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
index 153584f..670c355 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.contrib.streaming.state.benchmark;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
-import org.apache.flink.contrib.streaming.state.util.MergeUtils;
import org.apache.flink.testutils.junit.RetryOnFailure;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.TestLogger;
@@ -63,6 +62,8 @@ import java.util.List;
*/
public class RocksDBListStatePerformanceTest extends TestLogger {
+ private static final byte DELIMITER = ',';
+
@Rule
public final TemporaryFolder tmp = new TemporaryFolder();
@@ -121,7 +122,7 @@ public class RocksDBListStatePerformanceTest extends TestLogger {
for (int i = 0; i < num; i++) {
list.add(valueBytes);
}
- byte[] premerged = MergeUtils.merge(list);
+ byte[] premerged = merge(list);
log.info("begin update");
@@ -132,4 +133,36 @@ public class RocksDBListStatePerformanceTest extends TestLogger {
log.info("end update - duration: {} ns", (endInsert2 - beginInsert2));
}
}
+
+ /**
+ * Merge operands into a single value that can be put directly into RocksDB.
+ */
+ public static byte[] merge(List<byte[]> operands) {
+ if (operands == null || operands.size() == 0) {
+ return null;
+ }
+
+ if (operands.size() == 1) {
+ return operands.get(0);
+ }
+
+ int numBytes = 0;
+ for (byte[] arr : operands) {
+ numBytes += arr.length + 1;
+ }
+ numBytes--;
+
+ byte[] result = new byte[numBytes];
+
+ System.arraycopy(operands.get(0), 0, result, 0, operands.get(0).length);
+
+ for (int i = 1, arrIndex = operands.get(0).length; i < operands.size(); i++) {
+ result[arrIndex] = DELIMITER;
+ arrIndex += 1;
+ System.arraycopy(operands.get(i), 0, result, arrIndex, operands.get(i).length);
+ arrIndex += operands.get(i).length;
+ }
+
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce25688b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java
deleted file mode 100644
index 3493f76..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/util/MergeUtilsTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state.util;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for MergeUtils.
- */
-public class MergeUtilsTest {
-
- @Test
- public void testMergeMulti() {
- List<byte[]> list = Arrays.asList(
- new byte[]{0, 1, 2, 3},
- new byte[]{4},
- new byte[]{5, 6});
-
- byte[] expected = new byte[]{0, 1, 2, 3, MergeUtils.DELIMITER, 4, MergeUtils.DELIMITER, 5, 6};
- assertTrue(Arrays.equals(expected, MergeUtils.merge(list)));
- }
-
- @Test
- public void testMergeEmptyList() {
- // Empty list
- assertTrue(Arrays.equals(null, MergeUtils.merge(Collections.emptyList())));
- }
-
- @Test
- public void testMergeSingleton() {
- // Singleton list
- byte[] singletonData = new byte[] {0x42};
- assertTrue(Arrays.equals(singletonData, MergeUtils.merge(Arrays.asList(singletonData))));
- }
-}