You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2018/05/17 00:24:24 UTC

[geode] branch feature/transcoding_experiments updated: Fixing an issue with shared lz4 streams in the protobuf server

This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch feature/transcoding_experiments
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/transcoding_experiments by this push:
     new 3bfdccf  Fixing an issue with shared lz4 streams in the protobuf server
3bfdccf is described below

commit 3bfdccf1fa55a025f4494fbf8b9e2eb8a48e7822
Author: Dan Smith <up...@apache.org>
AuthorDate: Wed May 16 17:23:38 2018 -0700

    Fixing an issue with shared lz4 streams in the protobuf server
    
    We were sharing the lz4 streams between threads, causing issues with
    multithreaded tests.
---
 .../protobuf/v1/MessageExecutionContext.java       | 24 ++++++++++++++++++++++
 .../protobuf/v1/ProtobufStreamProcessor.java       | 17 +++++++--------
 2 files changed, 31 insertions(+), 10 deletions(-)

diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/MessageExecutionContext.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/MessageExecutionContext.java
index 48a500c..8420b17 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/MessageExecutionContext.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/MessageExecutionContext.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.internal.protocol.protobuf.v1;
 
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Properties;
 
 import org.apache.geode.annotations.Experimental;
@@ -29,6 +31,11 @@ import org.apache.geode.protocol.serialization.ValueSerializer;
 
 @Experimental
 public abstract class MessageExecutionContext {
+
+  private InputStream compressionInputStream;
+
+  private OutputStream compressionOutputStream;
+
   protected final ClientStatistics statistics;
   protected final SecurityService securityService;
   protected ConnectionState connectionState;
@@ -69,4 +76,21 @@ public abstract class MessageExecutionContext {
   public abstract void authenticate(Properties properties);
 
   public abstract void setValueSerializer(ValueSerializer valueSerializer);
+
+  public OutputStream getCompressionOutputStream() {
+    return compressionOutputStream;
+  }
+
+  public void setCompressionOutputStream(final OutputStream compressionOutputStream) {
+    this.compressionOutputStream = compressionOutputStream;
+  }
+
+  public InputStream getCompressionInputStream() {
+    return compressionInputStream;
+  }
+
+  public void setCompressionInputStream(final InputStream compressionInputStream) {
+    this.compressionInputStream = compressionInputStream;
+  }
+
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
index f0ad7b3..5cac0f6 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
@@ -44,8 +44,6 @@ public class ProtobufStreamProcessor {
   private final ProtobufOpsProcessor protobufOpsProcessor;
   private static final Logger logger = LogService.getLogger();
 
-  private InputStream compressionInputStream;
-  private OutputStream compressionOutputStream;
 
   public ProtobufStreamProcessor() {
     protobufProtocolSerializer = new ProtobufProtocolSerializer();
@@ -54,11 +52,9 @@ public class ProtobufStreamProcessor {
 
   public void receiveMessage(InputStream inputStream, OutputStream outputStream,
       MessageExecutionContext executionContext) throws IOException {
-    if (this.compressionInputStream != null) {
-      inputStream = compressionInputStream;
-    }
-    if (this.compressionOutputStream != null) {
-      outputStream = compressionOutputStream;
+    if (executionContext.getCompressionInputStream() != null) {
+      inputStream = executionContext.getCompressionInputStream();
+      outputStream = executionContext.getCompressionOutputStream();
     }
     try {
       processOneMessage(inputStream, outputStream, executionContext);
@@ -91,11 +87,12 @@ public class ProtobufStreamProcessor {
     protobufProtocolSerializer.serialize(response, outputStream);
     outputStream.flush();
 
-    if (message.hasHandshakeRequest() && compressionInputStream == null
+    if (message.hasHandshakeRequest() && executionContext.getCompressionInputStream() == null
         && (executionContext.getConnectionState() instanceof AcceptMessages) && USE_LZ4
         && !(executionContext instanceof LocatorMessageExecutionContext)) {
-      compressionOutputStream = new LZ4FrameOutputStream(outputStream, BLOCKSIZE.SIZE_64KB);
-      compressionInputStream = new LZ4FrameInputStream(inputStream);
+      executionContext
+          .setCompressionOutputStream(new LZ4FrameOutputStream(outputStream, BLOCKSIZE.SIZE_64KB));
+      executionContext.setCompressionInputStream(new LZ4FrameInputStream(inputStream));
     }
   }
 }

-- 
To stop receiving notification emails like this one, please contact
upthewaterspout@apache.org.