You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2018/05/16 21:41:41 UTC

[geode] branch feature/transcoding_experiments updated: Adding a hacky feature flag to use lz4 compression with protobuf

This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch feature/transcoding_experiments
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/transcoding_experiments by this push:
     new dc6bc6b  Adding a hacky feature flag to use lz4 compression with protobuf
dc6bc6b is described below

commit dc6bc6b6381a9b6c7dc9b4b94d02c542c5407751
Author: Dan Smith <up...@apache.org>
AuthorDate: Wed May 16 14:40:33 2018 -0700

    Adding a hacky feature flag to use lz4 compression with protobuf
    
    Adding a flag to compress all traffic after the initial handshake with
    lz4 streaming compression.
---
 .../tier/sockets/ProtobufServerConnection.java     |  2 --
 geode-experimental-driver/build.gradle             |  2 +-
 .../geode/experimental/driver/ProtobufChannel.java | 21 ++++++++++++++++----
 geode-protobuf/build.gradle                        |  1 +
 .../protobuf/v1/ProtobufStreamProcessor.java       | 23 ++++++++++++++++++++++
 5 files changed, 42 insertions(+), 7 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
index bd65271..064df10 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
@@ -71,8 +71,6 @@ public class ProtobufServerConnection extends ServerConnection {
       InternalCache cache = getCache();
       cache.setReadSerializedForCurrentThread(true);
       try {
-        protocolProcessor.processMessage(input, output);
-        output.flush();
         try {
           protocolProcessor.processMessage(input, output);
         } finally {
diff --git a/geode-experimental-driver/build.gradle b/geode-experimental-driver/build.gradle
index 1a4badc..3aa262f 100644
--- a/geode-experimental-driver/build.gradle
+++ b/geode-experimental-driver/build.gradle
@@ -18,7 +18,7 @@
 dependencies {
     compile project(':geode-common')
     compile project(':geode-protobuf-messages')
-
+    compile group: 'org.lz4', name: 'lz4-java', version: '1.4.1'
     compile 'com.google.protobuf:protobuf-java:' + project.'protobuf-java.version'
     testCompile project(':geode-core')
     testCompile project(':geode-junit')
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
index 1e5d239..97e034c 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
@@ -26,6 +26,10 @@ import java.security.GeneralSecurityException;
 import java.util.Objects;
 import java.util.Set;
 
+import net.jpountz.lz4.LZ4FrameInputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream.BLOCKSIZE;
+
 import org.apache.geode.internal.protocol.protobuf.ProtocolVersion;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
@@ -36,13 +40,15 @@ import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI;
 
 class ProtobufChannel {
+
+  public static final boolean USE_LZ4 = Boolean.getBoolean("gemfire.PROTOBUF_USE_LZ4");
   /**
    * Socket to a GemFire server that has Protobuf enabled.
    */
   final Socket socket;
-  final BufferedOutputStream output;
+  final OutputStream output;
   private final ValueSerializer serializer;
-  private final BufferedInputStream input;
+  private final InputStream input;
 
   public ProtobufChannel(final Set<InetSocketAddress> locators, String username, String password,
       String keyStorePath, String trustStorePath, String protocols, String ciphers,
@@ -50,8 +56,15 @@ class ProtobufChannel {
     this.serializer = serializer;
     socket = connectToAServer(locators, username, password, keyStorePath, trustStorePath, protocols,
         ciphers);
-    output = new BufferedOutputStream(socket.getOutputStream(), socket.getSendBufferSize());
-    input = new BufferedInputStream(socket.getInputStream(), socket.getReceiveBufferSize());
+
+
+    if (USE_LZ4) {
+      output = new LZ4FrameOutputStream(socket.getOutputStream(), BLOCKSIZE.SIZE_64KB);
+      input = new LZ4FrameInputStream(socket.getInputStream());
+    } else {
+      output = new BufferedOutputStream(socket.getOutputStream(), socket.getSendBufferSize());
+      input = new BufferedInputStream(socket.getInputStream(), socket.getReceiveBufferSize());
+    }
   }
 
   public void close() throws IOException {
diff --git a/geode-protobuf/build.gradle b/geode-protobuf/build.gradle
index 9bafc80..e9ff112 100644
--- a/geode-protobuf/build.gradle
+++ b/geode-protobuf/build.gradle
@@ -27,6 +27,7 @@ dependencies {
     testCompile 'org.powermock:powermock-api-mockito:' + project.'powermock.version'
 
     compile 'com.google.protobuf:protobuf-java:' + project.'protobuf-java.version'
+    compile group: 'org.lz4', name: 'lz4-java', version: '1.4.1'
     testCompile files(project(':geode-old-versions').sourceSets.main.output)
     testCompile 'com.pholser:junit-quickcheck-core:' + project.'junit-quickcheck.version'
     testCompile 'com.pholser:junit-quickcheck-generators:' + project.'junit-quickcheck.version'
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
index 0960e33..f0ad7b3 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
@@ -19,6 +19,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import net.jpountz.lz4.LZ4FrameInputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream.BLOCKSIZE;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.annotations.Experimental;
@@ -27,6 +30,7 @@ import org.apache.geode.internal.protocol.protobuf.statistics.ClientStatistics;
 import org.apache.geode.internal.protocol.protobuf.v1.registry.ProtobufOperationContextRegistry;
 import org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtocolSerializer;
 import org.apache.geode.internal.protocol.protobuf.v1.serializer.exception.InvalidProtocolMessageException;
+import org.apache.geode.internal.protocol.protobuf.v1.state.AcceptMessages;
 
 /**
  * This object handles an incoming stream containing protobuf messages. It parses the protobuf
@@ -35,10 +39,14 @@ import org.apache.geode.internal.protocol.protobuf.v1.serializer.exception.Inval
  */
 @Experimental
 public class ProtobufStreamProcessor {
+  public static final boolean USE_LZ4 = Boolean.getBoolean("gemfire.PROTOBUF_USE_LZ4");
   private final ProtobufProtocolSerializer protobufProtocolSerializer;
   private final ProtobufOpsProcessor protobufOpsProcessor;
   private static final Logger logger = LogService.getLogger();
 
+  private InputStream compressionInputStream;
+  private OutputStream compressionOutputStream;
+
   public ProtobufStreamProcessor() {
     protobufProtocolSerializer = new ProtobufProtocolSerializer();
     protobufOpsProcessor = new ProtobufOpsProcessor(new ProtobufOperationContextRegistry());
@@ -46,12 +54,19 @@ public class ProtobufStreamProcessor {
 
   public void receiveMessage(InputStream inputStream, OutputStream outputStream,
       MessageExecutionContext executionContext) throws IOException {
+    if (this.compressionInputStream != null) {
+      inputStream = compressionInputStream;
+    }
+    if (this.compressionOutputStream != null) {
+      outputStream = compressionOutputStream;
+    }
     try {
       processOneMessage(inputStream, outputStream, executionContext);
     } catch (InvalidProtocolMessageException e) {
       logger.info("Invalid message", e);
       throw new IOException(e);
     }
+
   }
 
   private void processOneMessage(InputStream inputStream, OutputStream outputStream,
@@ -74,5 +89,13 @@ public class ProtobufStreamProcessor {
     ClientProtocol.Message response = protobufOpsProcessor.process(message, executionContext);
     statistics.messageSent(response.getSerializedSize());
     protobufProtocolSerializer.serialize(response, outputStream);
+    outputStream.flush();
+
+    if (message.hasHandshakeRequest() && compressionInputStream == null
+        && (executionContext.getConnectionState() instanceof AcceptMessages) && USE_LZ4
+        && !(executionContext instanceof LocatorMessageExecutionContext)) {
+      compressionOutputStream = new LZ4FrameOutputStream(outputStream, BLOCKSIZE.SIZE_64KB);
+      compressionInputStream = new LZ4FrameInputStream(inputStream);
+    }
   }
 }

-- 
To stop receiving notification emails like this one, please contact
upthewaterspout@apache.org.