You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2019/04/17 20:17:43 UTC
[geode] branch develop updated: GEODE-6579: optimize string
deserialization (#3381)
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new def65d4 GEODE-6579: optimize string deserialization (#3381)
def65d4 is described below
commit def65d4d9018c43a56ea0de98fe4640c0d4e40c5
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Wed Apr 17 13:17:24 2019 -0700
GEODE-6579: optimize string deserialization (#3381)
Added ThreadLocalByteArrayCache used by readString for ASCII strings less than 65k.
Also added a jmh benchmark for InternalDataSerializer.readString.
It uses a SoftReference to allow the cached byte array
to be garbage collected if the jvm runs low on memory.
---
.../internal/InternalDataSerializerBenchmark.java | 72 ++++++++++++++++++
.../geode/internal/InternalDataSerializer.java | 8 +-
.../geode/internal/ThreadLocalByteArrayCache.java | 56 ++++++++++++++
.../internal/ThreadLocalByteArrayCacheTest.java | 85 ++++++++++++++++++++++
4 files changed, 219 insertions(+), 2 deletions(-)
diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java b/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java
new file mode 100644
index 0000000..1417a50
--- /dev/null
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import org.apache.geode.DataSerializer;
+
+
+/**
+ * Test throughput of InternalDataSerializer.readString
+ */
+
+@State(Scope.Thread)
+@Fork(1)
+public class InternalDataSerializerBenchmark {
+
+ private final ByteArrayDataInput dataInput = new ByteArrayDataInput();
+ private byte[] serializedBytes;
+
+ @Setup(Level.Trial)
+ public void setup() throws IOException {
+ HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+ DataSerializer.writeString("12345678901234567890123456789012345", hdos);
+ byte[] bytes = hdos.toByteArray();
+ if (bytes[0] != DSCODE.STRING_BYTES.toByte()) {
+ throw new IllegalStateException(
+ "expected first byte to be " + DSCODE.STRING_BYTES.toByte() + " but it was " + bytes[0]);
+ }
+ serializedBytes = Arrays.copyOfRange(bytes, 1, bytes.length);
+ }
+
+ @Benchmark
+ @Measurement(iterations = 10)
+ @Warmup(iterations = 3)
+ @BenchmarkMode(Mode.Throughput)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public String readStringBenchmark() throws IOException {
+ dataInput.initialize(serializedBytes, Version.CURRENT);
+ String result = InternalDataSerializer.readString(dataInput, DSCODE.STRING_BYTES.toByte());
+ return result;
+ }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
index 6b17322..ce98894 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
@@ -2593,14 +2593,18 @@ public abstract class InternalDataSerializer extends DataSerializer {
return in.readUTF();
}
+ @MakeNotStatic("not tied to the cache lifecycle")
+ private static final ThreadLocalByteArrayCache threadLocalByteArrayCache =
+ new ThreadLocalByteArrayCache(65535);
+
private static String readStringBytesFromDataInput(DataInput dataInput, int len)
throws IOException {
if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading STRING_BYTES of len={}", len);
}
- byte[] buf = new byte[len];
+ byte[] buf = threadLocalByteArrayCache.get(len);
dataInput.readFully(buf, 0, len);
- return new String(buf, 0); // intentionally using deprecated constructor
+ return new String(buf, 0, 0, len); // intentionally using deprecated constructor
}
public static String readString(DataInput in, byte header) throws IOException {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/ThreadLocalByteArrayCache.java b/geode-core/src/main/java/org/apache/geode/internal/ThreadLocalByteArrayCache.java
new file mode 100644
index 0000000..80b859a
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/ThreadLocalByteArrayCache.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal;
+
+import java.lang.ref.SoftReference;
+
+/**
+ * Provides a simple thread local cache of a single byte array.
+ */
+public class ThreadLocalByteArrayCache {
+ private final ThreadLocal<SoftReference<byte[]>> cache = new ThreadLocal<>();
+ private final int maximumArraySize;
+
+ /**
+ * @param maximumArraySize byte arrays larger than this size will not be cached
+ */
+ public ThreadLocalByteArrayCache(int maximumArraySize) {
+ this.maximumArraySize = maximumArraySize;
+ }
+
+ /**
+ * Returns a byte array whose length it at least minimumLength.
+ * NOTE: the same thread can not safely call this method again
+ * until it has finished using the byte array returned by a previous call.
+ * If minimumLength is larger than this cache's maximumArraySize
+ * then the returned byte array will not be cached.
+ *
+ * @param minimumLength the minimum length of the byte array
+ * @return a byte array, owned by this thread, whose length is at least minimumLength.
+ */
+ public byte[] get(int minimumLength) {
+ SoftReference<byte[]> reference = cache.get();
+ byte[] result = (reference != null) ? reference.get() : null;
+ if (result == null || result.length < minimumLength) {
+ result = new byte[minimumLength];
+ if (minimumLength <= maximumArraySize) {
+ cache.set(new SoftReference<byte[]>(result));
+ }
+ }
+ return result;
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/ThreadLocalByteArrayCacheTest.java b/geode-core/src/test/java/org/apache/geode/internal/ThreadLocalByteArrayCacheTest.java
new file mode 100644
index 0000000..65a1e58
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/ThreadLocalByteArrayCacheTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Test;
+
+public class ThreadLocalByteArrayCacheTest {
+ private ThreadLocalByteArrayCache instance = new ThreadLocalByteArrayCache(2);
+
+ @Test
+ public void emptyArrayReturned() {
+ byte[] byteArray = instance.get(0);
+
+ assertThat(byteArray).hasSize(0);
+ }
+
+ @Test
+ public void largerRequestCreatesNewArray() {
+ byte[] byteArrayZero = instance.get(0);
+ byte[] byteArrayOne = instance.get(1);
+
+ assertThat(byteArrayZero).hasSize(0);
+ assertThat(byteArrayOne).hasSize(1);
+ }
+
+ @Test
+ public void smallerRequestCreatesReturnsPreviousArray() {
+ byte[] byteArrayOne = instance.get(1);
+ byte[] byteArrayZero = instance.get(0);
+
+ assertThat(byteArrayOne).hasSize(1);
+ assertThat(byteArrayZero).isSameAs(byteArrayOne);
+ }
+
+ @Test
+ public void requestsLargerThanMaximumAreNotCached() {
+ byte[] byteArrayLarge = instance.get(100);
+ byte[] byteArrayZero = instance.get(0);
+
+ assertThat(byteArrayLarge).hasSize(100);
+ assertThat(byteArrayZero).isNotSameAs(byteArrayLarge);
+
+ }
+
+ @Test
+ public void equalRequestCreatesReturnsPreviousArray() {
+ byte[] byteArrayFirst = instance.get(1);
+ byte[] byteArraySecond = instance.get(1);
+
+ assertThat(byteArrayFirst).hasSize(1);
+ assertThat(byteArraySecond).isSameAs(byteArrayFirst);
+ }
+
+ @Test
+ public void threadsGetDifferentByteArrays() throws InterruptedException {
+ byte[] byteArrayZero = instance.get(0);
+ final AtomicReference<byte[]> byteArrayHolder = new AtomicReference<>();
+ Thread thread = new Thread(() -> {
+ byteArrayHolder.set(instance.get(0));
+ });
+ thread.start();
+ thread.join();
+
+ assertThat(byteArrayZero).hasSize(0);
+ assertThat(byteArrayHolder.get()).hasSize(0);
+ assertThat(byteArrayHolder.get()).isNotSameAs(byteArrayZero);
+
+ }
+}