You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2019/05/08 16:58:00 UTC

[geode] branch develop updated: GEODE-6686: prevent array allocation in FilterInfo toData (#3526)

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new e16910f  GEODE-6686: prevent array allocation in FilterInfo toData (#3526)
e16910f is described below

commit e16910f7679cec5a93261159e69408f4ea36c32b
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Wed May 8 09:57:48 2019 -0700

    GEODE-6686: prevent array allocation in FilterInfo toData (#3526)
    
    FilterInfo.toData no longer allocates a heap ByteBuffer
    every time it is called. Instead it uses the ThreadLocal byte array.
---
 .../codeAnalysis/sanctionedDataSerializables.txt   |  2 +-
 .../geode/internal/HeapDataOutputStream.java       | 18 +++++--
 .../geode/internal/InternalDataSerializer.java     | 13 ++++-
 .../geode/internal/cache/FilterRoutingInfo.java    | 12 ++---
 .../geode/internal/cache/FilterInfoTest.java       | 61 ++++++++++++++++++++++
 5 files changed, 94 insertions(+), 12 deletions(-)

diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 923e751..80ee315 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1020,7 +1020,7 @@ toDataPre_GFE_7_1_0_0,88
 org/apache/geode/internal/cache/FilterRoutingInfo$FilterInfo,4
 fromData,9
 fromDataPre_GFE_8_0_0_0,50
-toData,255
+toData,247
 toDataPre_GFE_8_0_0_0,213
 
 org/apache/geode/internal/cache/FindDurableQueueProcessor$FindDurableQueueMessage,2
diff --git a/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
index 4d165b2..8f021bf 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
@@ -184,6 +184,14 @@ public class HeapDataOutputStream extends OutputStream
     this.memoPosition = this.buffer.position();
   }
 
+  /**
+   * If this HeapDataOutputStream detects that it needs to
+   * grow then it will throw an IllegalStateException.
+   */
+  public void disallowExpansion() {
+    this.disallowExpansion = true;
+  }
+
   /** write the low-order 8 bits of the given int */
   @Override
   public void write(int b) {
@@ -203,9 +211,13 @@ public class HeapDataOutputStream extends OutputStream
 
   private void expand(int amount) {
     if (this.disallowExpansion) {
-      this.buffer.position(this.memoPosition);
-      this.ignoreWrites = true;
-      throw this.expansionException;
+      if (this.expansionException != null) {
+        this.ignoreWrites = true;
+        this.buffer.position(this.memoPosition);
+        throw this.expansionException;
+      } else {
+        throw new IllegalStateException("initial buffer size was exceeded");
+      }
     }
 
     final ByteBuffer oldBuffer = this.buffer;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
index 4be75b4..0c9c9c7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
@@ -2597,6 +2597,17 @@ public abstract class InternalDataSerializer extends DataSerializer {
   private static final ThreadLocalByteArrayCache threadLocalByteArrayCache =
       new ThreadLocalByteArrayCache(65535);
 
+  /**
+   * Returns a byte array for use by the calling thread.
+   * The returned byte array may be longer than minimumLength.
+   * The byte array belongs to the calling thread but callers must
+   * be careful to not call other methods that may also use this
+   * byte array.
+   */
+  public static byte[] getThreadLocalByteArray(int minimumLength) {
+    return threadLocalByteArrayCache.get(minimumLength);
+  }
+
   private static String readStringBytesFromDataInput(DataInput dataInput, int len)
       throws IOException {
     if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
@@ -2605,7 +2616,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
     if (len == 0) {
       return "";
     }
-    byte[] buf = threadLocalByteArrayCache.get(len);
+    byte[] buf = getThreadLocalByteArray(len);
     dataInput.readFully(buf, 0, len);
     return new String(buf, 0, 0, len); // intentionally using deprecated constructor
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
index 835b3e9..7d9e7ef 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
@@ -380,7 +380,9 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
       size += interestedClients == null ? 4 : interestedClients.size() * 8 + 5;
       size += interestedClientsInv == null ? 4 : interestedClientsInv.size() * 8 + 5;
       size += cqs == null ? 0 : cqs.size() * 12;
-      hdos = new HeapDataOutputStream(size, null);
+      byte[] myData = InternalDataSerializer.getThreadLocalByteArray(size);
+      hdos = new HeapDataOutputStream(myData);
+      hdos.disallowExpansion();
       if (this.cqs == null) {
         hdos.writeBoolean(false);
       } else {
@@ -396,12 +398,8 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
       }
       InternalDataSerializer.writeSetOfLongs(this.interestedClients, this.longIDs, hdos);
       InternalDataSerializer.writeSetOfLongs(this.interestedClientsInv, this.longIDs, hdos);
-      if (out instanceof HeapDataOutputStream) {
-        ((ObjToByteArraySerializer) out).writeAsSerializedByteArray(hdos);
-      } else {
-        byte[] myData = hdos.toByteArray();
-        DataSerializer.writeByteArray(myData, out);
-      }
+      hdos.finishWriting();
+      DataSerializer.writeByteArray(myData, hdos.size(), out);
     }
 
     public void fromDataPre_GFE_8_0_0_0(DataInput in) throws IOException, ClassNotFoundException {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java
new file mode 100644
index 0000000..40ad820
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.geode.internal.ByteArrayDataInput;
+import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
+
+/**
+ * Unit test for FilterRoutingInfo.FilterInfo
+ */
+public class FilterInfoTest {
+  @Test
+  public void validateSerialization() throws IOException, ClassNotFoundException {
+    FilterInfo serialized = new FilterInfo();
+    HashMap<Long, Integer> cqs = new HashMap<>();
+    cqs.put(1L, 1);
+    cqs.put(2L, 2);
+    serialized.setCQs(cqs);
+    Set<Long> clients = new HashSet<>();
+    clients.add(1L);
+    clients.add(2L);
+    serialized.setInterestedClients(clients);
+    Set<Long> clientsInv = new HashSet<>();
+    clientsInv.add(3L);
+    clientsInv.add(4L);
+    serialized.setInterestedClientsInv(clientsInv);
+    HeapDataOutputStream dataOut = new HeapDataOutputStream(Version.CURRENT);
+    serialized.toData(dataOut);
+    byte[] outputBytes = dataOut.toByteArray();
+    FilterInfo deserialized = new FilterInfo();
+    ByteArrayDataInput dataInput = new ByteArrayDataInput();
+    dataInput.initialize(outputBytes, Version.CURRENT);
+    deserialized.fromData(dataInput);
+    assertThat(deserialized.getCQs()).isEqualTo(cqs);
+    assertThat(deserialized.getInterestedClients()).isEqualTo(clients);
+    assertThat(deserialized.getInterestedClientsInv()).isEqualTo(clientsInv);
+  }
+}