You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2018/05/17 00:24:24 UTC
[geode] branch feature/transcoding_experiments updated: Fixing an
issue with shared lz4 streams in the protobuf server
This is an automated email from the ASF dual-hosted git repository.
upthewaterspout pushed a commit to branch feature/transcoding_experiments
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/transcoding_experiments by this push:
new 3bfdccf Fixing an issue with shared lz4 streams in the protobuf server
3bfdccf is described below
commit 3bfdccf1fa55a025f4494fbf8b9e2eb8a48e7822
Author: Dan Smith <up...@apache.org>
AuthorDate: Wed May 16 17:23:38 2018 -0700
Fixing an issue with shared lz4 streams in the protobuf server
We were sharing the lz4 streams between threads, causing issues with
multithreaded tests.
---
.../protobuf/v1/MessageExecutionContext.java | 24 ++++++++++++++++++++++
.../protobuf/v1/ProtobufStreamProcessor.java | 17 +++++++--------
2 files changed, 31 insertions(+), 10 deletions(-)
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/MessageExecutionContext.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/MessageExecutionContext.java
index 48a500c..8420b17 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/MessageExecutionContext.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/MessageExecutionContext.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.internal.protocol.protobuf.v1;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.Properties;
import org.apache.geode.annotations.Experimental;
@@ -29,6 +31,11 @@ import org.apache.geode.protocol.serialization.ValueSerializer;
@Experimental
public abstract class MessageExecutionContext {
+
+ private InputStream compressionInputStream;
+
+ private OutputStream compressionOutputStream;
+
protected final ClientStatistics statistics;
protected final SecurityService securityService;
protected ConnectionState connectionState;
@@ -69,4 +76,21 @@ public abstract class MessageExecutionContext {
public abstract void authenticate(Properties properties);
public abstract void setValueSerializer(ValueSerializer valueSerializer);
+
+ public OutputStream getCompressionOutputStream() {
+ return compressionOutputStream;
+ }
+
+ public void setCompressionOutputStream(final OutputStream compressionOutputStream) {
+ this.compressionOutputStream = compressionOutputStream;
+ }
+
+ public InputStream getCompressionInputStream() {
+ return compressionInputStream;
+ }
+
+ public void setCompressionInputStream(final InputStream compressionInputStream) {
+ this.compressionInputStream = compressionInputStream;
+ }
+
}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
index f0ad7b3..5cac0f6 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
@@ -44,8 +44,6 @@ public class ProtobufStreamProcessor {
private final ProtobufOpsProcessor protobufOpsProcessor;
private static final Logger logger = LogService.getLogger();
- private InputStream compressionInputStream;
- private OutputStream compressionOutputStream;
public ProtobufStreamProcessor() {
protobufProtocolSerializer = new ProtobufProtocolSerializer();
@@ -54,11 +52,9 @@ public class ProtobufStreamProcessor {
public void receiveMessage(InputStream inputStream, OutputStream outputStream,
MessageExecutionContext executionContext) throws IOException {
- if (this.compressionInputStream != null) {
- inputStream = compressionInputStream;
- }
- if (this.compressionOutputStream != null) {
- outputStream = compressionOutputStream;
+ if (executionContext.getCompressionInputStream() != null) {
+ inputStream = executionContext.getCompressionInputStream();
+ outputStream = executionContext.getCompressionOutputStream();
}
try {
processOneMessage(inputStream, outputStream, executionContext);
@@ -91,11 +87,12 @@ public class ProtobufStreamProcessor {
protobufProtocolSerializer.serialize(response, outputStream);
outputStream.flush();
- if (message.hasHandshakeRequest() && compressionInputStream == null
+ if (message.hasHandshakeRequest() && executionContext.getCompressionInputStream() == null
&& (executionContext.getConnectionState() instanceof AcceptMessages) && USE_LZ4
&& !(executionContext instanceof LocatorMessageExecutionContext)) {
- compressionOutputStream = new LZ4FrameOutputStream(outputStream, BLOCKSIZE.SIZE_64KB);
- compressionInputStream = new LZ4FrameInputStream(inputStream);
+ executionContext
+ .setCompressionOutputStream(new LZ4FrameOutputStream(outputStream, BLOCKSIZE.SIZE_64KB));
+ executionContext.setCompressionInputStream(new LZ4FrameInputStream(inputStream));
}
}
}
--
To stop receiving notification emails like this one, please contact
upthewaterspout@apache.org.