You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2019/04/17 20:17:43 UTC

[geode] branch develop updated: GEODE-6579: optimize string deserialization (#3381)

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new def65d4  GEODE-6579: optimize string deserialization (#3381)
def65d4 is described below

commit def65d4d9018c43a56ea0de98fe4640c0d4e40c5
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Wed Apr 17 13:17:24 2019 -0700

    GEODE-6579: optimize string deserialization (#3381)
    
    Added ThreadLocalByteArrayCache used by readString for ASCII strings less than 65k.
    Also added a jmh benchmark for InternalDataSerializer.readString.
    It uses a SoftReference to allow the cached byte array
    to be garbage collected if the jvm runs low on memory.
---
 .../internal/InternalDataSerializerBenchmark.java  | 72 ++++++++++++++++++
 .../geode/internal/InternalDataSerializer.java     |  8 +-
 .../geode/internal/ThreadLocalByteArrayCache.java  | 56 ++++++++++++++
 .../internal/ThreadLocalByteArrayCacheTest.java    | 85 ++++++++++++++++++++++
 4 files changed, 219 insertions(+), 2 deletions(-)

diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java b/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java
new file mode 100644
index 0000000..1417a50
--- /dev/null
+++ b/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import org.apache.geode.DataSerializer;
+
+
+/**
+ * Test throughput of InternalDataSerializer.readString
+ */
+
+@State(Scope.Thread)
+@Fork(1)
+public class InternalDataSerializerBenchmark {
+
+  private final ByteArrayDataInput dataInput = new ByteArrayDataInput();
+  private byte[] serializedBytes;
+
+  @Setup(Level.Trial)
+  public void setup() throws IOException {
+    HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+    DataSerializer.writeString("12345678901234567890123456789012345", hdos);
+    byte[] bytes = hdos.toByteArray();
+    if (bytes[0] != DSCODE.STRING_BYTES.toByte()) {
+      throw new IllegalStateException(
+          "expected first byte to be " + DSCODE.STRING_BYTES.toByte() + " but it was " + bytes[0]);
+    }
+    serializedBytes = Arrays.copyOfRange(bytes, 1, bytes.length);
+  }
+
+  @Benchmark
+  @Measurement(iterations = 10)
+  @Warmup(iterations = 3)
+  @BenchmarkMode(Mode.Throughput)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public String readStringBenchmark() throws IOException {
+    dataInput.initialize(serializedBytes, Version.CURRENT);
+    String result = InternalDataSerializer.readString(dataInput, DSCODE.STRING_BYTES.toByte());
+    return result;
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
index 6b17322..ce98894 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
@@ -2593,14 +2593,18 @@ public abstract class InternalDataSerializer extends DataSerializer {
     return in.readUTF();
   }
 
+  @MakeNotStatic("not tied to the cache lifecycle")
+  private static final ThreadLocalByteArrayCache threadLocalByteArrayCache =
+      new ThreadLocalByteArrayCache(65535);
+
   private static String readStringBytesFromDataInput(DataInput dataInput, int len)
       throws IOException {
     if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
       logger.trace(LogMarker.SERIALIZER_VERBOSE, "Reading STRING_BYTES of len={}", len);
     }
-    byte[] buf = new byte[len];
+    byte[] buf = threadLocalByteArrayCache.get(len);
     dataInput.readFully(buf, 0, len);
-    return new String(buf, 0); // intentionally using deprecated constructor
+    return new String(buf, 0, 0, len); // intentionally using deprecated constructor
   }
 
   public static String readString(DataInput in, byte header) throws IOException {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/ThreadLocalByteArrayCache.java b/geode-core/src/main/java/org/apache/geode/internal/ThreadLocalByteArrayCache.java
new file mode 100644
index 0000000..80b859a
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/ThreadLocalByteArrayCache.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal;
+
+import java.lang.ref.SoftReference;
+
+/**
+ * Provides a simple thread local cache of a single byte array.
+ */
+public class ThreadLocalByteArrayCache {
+  private final ThreadLocal<SoftReference<byte[]>> cache = new ThreadLocal<>();
+  private final int maximumArraySize;
+
+  /**
+   * @param maximumArraySize byte arrays larger than this size will not be cached
+   */
+  public ThreadLocalByteArrayCache(int maximumArraySize) {
+    this.maximumArraySize = maximumArraySize;
+  }
+
+  /**
+   * Returns a byte array whose length it at least minimumLength.
+   * NOTE: the same thread can not safely call this method again
+   * until it has finished using the byte array returned by a previous call.
+   * If minimumLength is larger than this cache's maximumArraySize
+   * then the returned byte array will not be cached.
+   *
+   * @param minimumLength the minimum length of the byte array
+   * @return a byte array, owned by this thread, whose length is at least minimumLength.
+   */
+  public byte[] get(int minimumLength) {
+    SoftReference<byte[]> reference = cache.get();
+    byte[] result = (reference != null) ? reference.get() : null;
+    if (result == null || result.length < minimumLength) {
+      result = new byte[minimumLength];
+      if (minimumLength <= maximumArraySize) {
+        cache.set(new SoftReference<byte[]>(result));
+      }
+    }
+    return result;
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/ThreadLocalByteArrayCacheTest.java b/geode-core/src/test/java/org/apache/geode/internal/ThreadLocalByteArrayCacheTest.java
new file mode 100644
index 0000000..65a1e58
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/ThreadLocalByteArrayCacheTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Test;
+
+public class ThreadLocalByteArrayCacheTest {
+  private ThreadLocalByteArrayCache instance = new ThreadLocalByteArrayCache(2);
+
+  @Test
+  public void emptyArrayReturned() {
+    byte[] byteArray = instance.get(0);
+
+    assertThat(byteArray).hasSize(0);
+  }
+
+  @Test
+  public void largerRequestCreatesNewArray() {
+    byte[] byteArrayZero = instance.get(0);
+    byte[] byteArrayOne = instance.get(1);
+
+    assertThat(byteArrayZero).hasSize(0);
+    assertThat(byteArrayOne).hasSize(1);
+  }
+
+  @Test
+  public void smallerRequestCreatesReturnsPreviousArray() {
+    byte[] byteArrayOne = instance.get(1);
+    byte[] byteArrayZero = instance.get(0);
+
+    assertThat(byteArrayOne).hasSize(1);
+    assertThat(byteArrayZero).isSameAs(byteArrayOne);
+  }
+
+  @Test
+  public void requestsLargerThanMaximumAreNotCached() {
+    byte[] byteArrayLarge = instance.get(100);
+    byte[] byteArrayZero = instance.get(0);
+
+    assertThat(byteArrayLarge).hasSize(100);
+    assertThat(byteArrayZero).isNotSameAs(byteArrayLarge);
+
+  }
+
+  @Test
+  public void equalRequestCreatesReturnsPreviousArray() {
+    byte[] byteArrayFirst = instance.get(1);
+    byte[] byteArraySecond = instance.get(1);
+
+    assertThat(byteArrayFirst).hasSize(1);
+    assertThat(byteArraySecond).isSameAs(byteArrayFirst);
+  }
+
+  @Test
+  public void threadsGetDifferentByteArrays() throws InterruptedException {
+    byte[] byteArrayZero = instance.get(0);
+    final AtomicReference<byte[]> byteArrayHolder = new AtomicReference<>();
+    Thread thread = new Thread(() -> {
+      byteArrayHolder.set(instance.get(0));
+    });
+    thread.start();
+    thread.join();
+
+    assertThat(byteArrayZero).hasSize(0);
+    assertThat(byteArrayHolder.get()).hasSize(0);
+    assertThat(byteArrayHolder.get()).isNotSameAs(byteArrayZero);
+
+  }
+}