You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2019/05/08 16:58:00 UTC
[geode] branch develop updated: GEODE-6686: prevent array
allocation in FilterInfo toData (#3526)
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new e16910f GEODE-6686: prevent array allocation in FilterInfo toData (#3526)
e16910f is described below
commit e16910f7679cec5a93261159e69408f4ea36c32b
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Wed May 8 09:57:48 2019 -0700
GEODE-6686: prevent array allocation in FilterInfo toData (#3526)
FilterInfo.toData no longer allocates a heap ByteBuffer
every time it is called. Instead it uses the ThreadLocal byte array.
---
.../codeAnalysis/sanctionedDataSerializables.txt | 2 +-
.../geode/internal/HeapDataOutputStream.java | 18 +++++--
.../geode/internal/InternalDataSerializer.java | 13 ++++-
.../geode/internal/cache/FilterRoutingInfo.java | 12 ++---
.../geode/internal/cache/FilterInfoTest.java | 61 ++++++++++++++++++++++
5 files changed, 94 insertions(+), 12 deletions(-)
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 923e751..80ee315 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1020,7 +1020,7 @@ toDataPre_GFE_7_1_0_0,88
org/apache/geode/internal/cache/FilterRoutingInfo$FilterInfo,4
fromData,9
fromDataPre_GFE_8_0_0_0,50
-toData,255
+toData,247
toDataPre_GFE_8_0_0_0,213
org/apache/geode/internal/cache/FindDurableQueueProcessor$FindDurableQueueMessage,2
diff --git a/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
index 4d165b2..8f021bf 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
@@ -184,6 +184,14 @@ public class HeapDataOutputStream extends OutputStream
this.memoPosition = this.buffer.position();
}
+ /**
+ * If this HeapDataOutputStream detects that it needs to
+ * grow then it will throw an IllegalStateException.
+ */
+ public void disallowExpansion() {
+ this.disallowExpansion = true;
+ }
+
/** write the low-order 8 bits of the given int */
@Override
public void write(int b) {
@@ -203,9 +211,13 @@ public class HeapDataOutputStream extends OutputStream
private void expand(int amount) {
if (this.disallowExpansion) {
- this.buffer.position(this.memoPosition);
- this.ignoreWrites = true;
- throw this.expansionException;
+ if (this.expansionException != null) {
+ this.ignoreWrites = true;
+ this.buffer.position(this.memoPosition);
+ throw this.expansionException;
+ } else {
+ throw new IllegalStateException("initial buffer size was exceeded");
+ }
}
final ByteBuffer oldBuffer = this.buffer;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
index 4be75b4..0c9c9c7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
@@ -2597,6 +2597,17 @@ public abstract class InternalDataSerializer extends DataSerializer {
private static final ThreadLocalByteArrayCache threadLocalByteArrayCache =
new ThreadLocalByteArrayCache(65535);
+ /**
+ * Returns a byte array for use by the calling thread.
+ * The returned byte array may be longer than minimumLength.
+ * The byte array belongs to the calling thread but callers must
+ * be careful to not call other methods that may also use this
+ * byte array.
+ */
+ public static byte[] getThreadLocalByteArray(int minimumLength) {
+ return threadLocalByteArrayCache.get(minimumLength);
+ }
+
private static String readStringBytesFromDataInput(DataInput dataInput, int len)
throws IOException {
if (logger.isTraceEnabled(LogMarker.SERIALIZER_VERBOSE)) {
@@ -2605,7 +2616,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
if (len == 0) {
return "";
}
- byte[] buf = threadLocalByteArrayCache.get(len);
+ byte[] buf = getThreadLocalByteArray(len);
dataInput.readFully(buf, 0, len);
return new String(buf, 0, 0, len); // intentionally using deprecated constructor
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
index 835b3e9..7d9e7ef 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
@@ -380,7 +380,9 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
size += interestedClients == null ? 4 : interestedClients.size() * 8 + 5;
size += interestedClientsInv == null ? 4 : interestedClientsInv.size() * 8 + 5;
size += cqs == null ? 0 : cqs.size() * 12;
- hdos = new HeapDataOutputStream(size, null);
+ byte[] myData = InternalDataSerializer.getThreadLocalByteArray(size);
+ hdos = new HeapDataOutputStream(myData);
+ hdos.disallowExpansion();
if (this.cqs == null) {
hdos.writeBoolean(false);
} else {
@@ -396,12 +398,8 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
}
InternalDataSerializer.writeSetOfLongs(this.interestedClients, this.longIDs, hdos);
InternalDataSerializer.writeSetOfLongs(this.interestedClientsInv, this.longIDs, hdos);
- if (out instanceof HeapDataOutputStream) {
- ((ObjToByteArraySerializer) out).writeAsSerializedByteArray(hdos);
- } else {
- byte[] myData = hdos.toByteArray();
- DataSerializer.writeByteArray(myData, out);
- }
+ hdos.finishWriting();
+ DataSerializer.writeByteArray(myData, hdos.size(), out);
}
public void fromDataPre_GFE_8_0_0_0(DataInput in) throws IOException, ClassNotFoundException {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java
new file mode 100644
index 0000000..40ad820
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.geode.internal.ByteArrayDataInput;
+import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
+
+/**
+ * Unit test for FilterRoutingInfo.FilterInfo
+ */
+public class FilterInfoTest {
+ @Test
+ public void validateSerialization() throws IOException, ClassNotFoundException {
+ FilterInfo serialized = new FilterInfo();
+ HashMap<Long, Integer> cqs = new HashMap<>();
+ cqs.put(1L, 1);
+ cqs.put(2L, 2);
+ serialized.setCQs(cqs);
+ Set<Long> clients = new HashSet<>();
+ clients.add(1L);
+ clients.add(2L);
+ serialized.setInterestedClients(clients);
+ Set<Long> clientsInv = new HashSet<>();
+ clientsInv.add(3L);
+ clientsInv.add(4L);
+ serialized.setInterestedClientsInv(clientsInv);
+ HeapDataOutputStream dataOut = new HeapDataOutputStream(Version.CURRENT);
+ serialized.toData(dataOut);
+ byte[] outputBytes = dataOut.toByteArray();
+ FilterInfo deserialized = new FilterInfo();
+ ByteArrayDataInput dataInput = new ByteArrayDataInput();
+ dataInput.initialize(outputBytes, Version.CURRENT);
+ deserialized.fromData(dataInput);
+ assertThat(deserialized.getCQs()).isEqualTo(cqs);
+ assertThat(deserialized.getInterestedClients()).isEqualTo(clients);
+ assertThat(deserialized.getInterestedClientsInv()).isEqualTo(clientsInv);
+ }
+}