You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2018/05/16 21:41:41 UTC
[geode] branch feature/transcoding_experiments updated: Adding a
hacky feature flag to use lz4 compression with protobuf
This is an automated email from the ASF dual-hosted git repository.
upthewaterspout pushed a commit to branch feature/transcoding_experiments
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/transcoding_experiments by this push:
new dc6bc6b Adding a hacky feature flag to use lz4 compression with protobuf
dc6bc6b is described below
commit dc6bc6b6381a9b6c7dc9b4b94d02c542c5407751
Author: Dan Smith <up...@apache.org>
AuthorDate: Wed May 16 14:40:33 2018 -0700
Adding a hacky feature flag to use lz4 compression with protobuf
Adding a flag to compress all traffic after the initial handshake with
lz4 streaming compression.
---
.../tier/sockets/ProtobufServerConnection.java | 2 --
geode-experimental-driver/build.gradle | 2 +-
.../geode/experimental/driver/ProtobufChannel.java | 21 ++++++++++++++++----
geode-protobuf/build.gradle | 1 +
.../protobuf/v1/ProtobufStreamProcessor.java | 23 ++++++++++++++++++++++
5 files changed, 42 insertions(+), 7 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
index bd65271..064df10 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
@@ -71,8 +71,6 @@ public class ProtobufServerConnection extends ServerConnection {
InternalCache cache = getCache();
cache.setReadSerializedForCurrentThread(true);
try {
- protocolProcessor.processMessage(input, output);
- output.flush();
try {
protocolProcessor.processMessage(input, output);
} finally {
diff --git a/geode-experimental-driver/build.gradle b/geode-experimental-driver/build.gradle
index 1a4badc..3aa262f 100644
--- a/geode-experimental-driver/build.gradle
+++ b/geode-experimental-driver/build.gradle
@@ -18,7 +18,7 @@
dependencies {
compile project(':geode-common')
compile project(':geode-protobuf-messages')
-
+ compile group: 'org.lz4', name: 'lz4-java', version: '1.4.1'
compile 'com.google.protobuf:protobuf-java:' + project.'protobuf-java.version'
testCompile project(':geode-core')
testCompile project(':geode-junit')
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
index 1e5d239..97e034c 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
@@ -26,6 +26,10 @@ import java.security.GeneralSecurityException;
import java.util.Objects;
import java.util.Set;
+import net.jpountz.lz4.LZ4FrameInputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream.BLOCKSIZE;
+
import org.apache.geode.internal.protocol.protobuf.ProtocolVersion;
import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
@@ -36,13 +40,15 @@ import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI;
class ProtobufChannel {
+
+ public static final boolean USE_LZ4 = Boolean.getBoolean("gemfire.PROTOBUF_USE_LZ4");
/**
* Socket to a GemFire server that has Protobuf enabled.
*/
final Socket socket;
- final BufferedOutputStream output;
+ final OutputStream output;
private final ValueSerializer serializer;
- private final BufferedInputStream input;
+ private final InputStream input;
public ProtobufChannel(final Set<InetSocketAddress> locators, String username, String password,
String keyStorePath, String trustStorePath, String protocols, String ciphers,
@@ -50,8 +56,15 @@ class ProtobufChannel {
this.serializer = serializer;
socket = connectToAServer(locators, username, password, keyStorePath, trustStorePath, protocols,
ciphers);
- output = new BufferedOutputStream(socket.getOutputStream(), socket.getSendBufferSize());
- input = new BufferedInputStream(socket.getInputStream(), socket.getReceiveBufferSize());
+
+
+ if (USE_LZ4) {
+ output = new LZ4FrameOutputStream(socket.getOutputStream(), BLOCKSIZE.SIZE_64KB);
+ input = new LZ4FrameInputStream(socket.getInputStream());
+ } else {
+ output = new BufferedOutputStream(socket.getOutputStream(), socket.getSendBufferSize());
+ input = new BufferedInputStream(socket.getInputStream(), socket.getReceiveBufferSize());
+ }
}
public void close() throws IOException {
diff --git a/geode-protobuf/build.gradle b/geode-protobuf/build.gradle
index 9bafc80..e9ff112 100644
--- a/geode-protobuf/build.gradle
+++ b/geode-protobuf/build.gradle
@@ -27,6 +27,7 @@ dependencies {
testCompile 'org.powermock:powermock-api-mockito:' + project.'powermock.version'
compile 'com.google.protobuf:protobuf-java:' + project.'protobuf-java.version'
+ compile group: 'org.lz4', name: 'lz4-java', version: '1.4.1'
testCompile files(project(':geode-old-versions').sourceSets.main.output)
testCompile 'com.pholser:junit-quickcheck-core:' + project.'junit-quickcheck.version'
testCompile 'com.pholser:junit-quickcheck-generators:' + project.'junit-quickcheck.version'
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
index 0960e33..f0ad7b3 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufStreamProcessor.java
@@ -19,6 +19,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import net.jpountz.lz4.LZ4FrameInputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream.BLOCKSIZE;
import org.apache.logging.log4j.Logger;
import org.apache.geode.annotations.Experimental;
@@ -27,6 +30,7 @@ import org.apache.geode.internal.protocol.protobuf.statistics.ClientStatistics;
import org.apache.geode.internal.protocol.protobuf.v1.registry.ProtobufOperationContextRegistry;
import org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtocolSerializer;
import org.apache.geode.internal.protocol.protobuf.v1.serializer.exception.InvalidProtocolMessageException;
+import org.apache.geode.internal.protocol.protobuf.v1.state.AcceptMessages;
/**
* This object handles an incoming stream containing protobuf messages. It parses the protobuf
@@ -35,10 +39,14 @@ import org.apache.geode.internal.protocol.protobuf.v1.serializer.exception.Inval
*/
@Experimental
public class ProtobufStreamProcessor {
+ public static final boolean USE_LZ4 = Boolean.getBoolean("gemfire.PROTOBUF_USE_LZ4");
private final ProtobufProtocolSerializer protobufProtocolSerializer;
private final ProtobufOpsProcessor protobufOpsProcessor;
private static final Logger logger = LogService.getLogger();
+ private InputStream compressionInputStream;
+ private OutputStream compressionOutputStream;
+
public ProtobufStreamProcessor() {
protobufProtocolSerializer = new ProtobufProtocolSerializer();
protobufOpsProcessor = new ProtobufOpsProcessor(new ProtobufOperationContextRegistry());
@@ -46,12 +54,19 @@ public class ProtobufStreamProcessor {
public void receiveMessage(InputStream inputStream, OutputStream outputStream,
MessageExecutionContext executionContext) throws IOException {
+ if (this.compressionInputStream != null) {
+ inputStream = compressionInputStream;
+ }
+ if (this.compressionOutputStream != null) {
+ outputStream = compressionOutputStream;
+ }
try {
processOneMessage(inputStream, outputStream, executionContext);
} catch (InvalidProtocolMessageException e) {
logger.info("Invalid message", e);
throw new IOException(e);
}
+
}
private void processOneMessage(InputStream inputStream, OutputStream outputStream,
@@ -74,5 +89,13 @@ public class ProtobufStreamProcessor {
ClientProtocol.Message response = protobufOpsProcessor.process(message, executionContext);
statistics.messageSent(response.getSerializedSize());
protobufProtocolSerializer.serialize(response, outputStream);
+ outputStream.flush();
+
+ if (message.hasHandshakeRequest() && compressionInputStream == null
+ && (executionContext.getConnectionState() instanceof AcceptMessages) && USE_LZ4
+ && !(executionContext instanceof LocatorMessageExecutionContext)) {
+ compressionOutputStream = new LZ4FrameOutputStream(outputStream, BLOCKSIZE.SIZE_64KB);
+ compressionInputStream = new LZ4FrameInputStream(inputStream);
+ }
}
}
--
To stop receiving notification emails like this one, please contact
upthewaterspout@apache.org.