You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by el...@apache.org on 2016/03/03 18:30:48 UTC
[4/7] calcite git commit: [CALCITE-1094] Replace
ByteArrayOutputStream to avoid synchronized writes
[CALCITE-1094] Replace ByteArrayOutputStream to avoid synchronized writes
Pull in the ZeroCopyByteString class. We can cap the amount of byte[]'s
that we are making by providing a buffer to the current thread. This also
avoids any synchronization on a typical object pool.
Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/1d3a26df
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/1d3a26df
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/1d3a26df
Branch: refs/heads/master
Commit: 1d3a26dfac17fea458402a637449007dc095bced
Parents: 0de38aa
Author: Josh Elser <el...@apache.org>
Authored: Wed Mar 2 17:43:41 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Wed Mar 2 23:26:02 2016 -0500
----------------------------------------------------------------------
.../avatica/server/AvaticaJsonHandler.java | 16 +-
.../avatica/server/AvaticaProtobufHandler.java | 20 ++-
.../protobuf/HBaseZeroCopyByteString.java | 77 ++++++++++
.../java/com/google/protobuf/package-info.java | 26 ++++
.../apache/calcite/avatica/AvaticaUtils.java | 47 ++++--
.../avatica/remote/ProtobufTranslation.java | 10 +-
.../avatica/remote/ProtobufTranslationImpl.java | 146 +++++++++++++++---
.../avatica/remote/RequestTranslator.java | 3 +-
.../apache/calcite/avatica/remote/Service.java | 8 +-
.../calcite/avatica/remote/TypedValue.java | 3 +-
.../avatica/util/UnsynchronizedBuffer.java | 152 +++++++++++++++++++
.../remote/ProtobufSerializationTest.java | 75 +++++++++
.../avatica/util/UnsynchronizedBufferTest.java | 41 +++++
13 files changed, 578 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
index 703a2c3..34a9333 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
@@ -25,6 +25,7 @@ import org.apache.calcite.avatica.remote.Handler.HandlerResponse;
import org.apache.calcite.avatica.remote.JsonHandler;
import org.apache.calcite.avatica.remote.Service;
import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
@@ -54,6 +55,8 @@ public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareA
final MetricsSystem metrics;
final Timer requestTimer;
+ final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer;
+
public AvaticaJsonHandler(Service service) {
this(service, NoopMetricsSystem.getInstance());
}
@@ -67,6 +70,12 @@ public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareA
// Metrics
this.requestTimer = this.metrics.getTimer(
concat(AvaticaJsonHandler.class, MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME));
+
+ this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() {
+ @Override public UnsynchronizedBuffer initialValue() {
+ return new UnsynchronizedBuffer();
+ }
+ };
}
public void handle(String target, Request baseRequest,
@@ -80,8 +89,13 @@ public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareA
// The latter allows very large requests without hitting HTTP 413.
String rawRequest = request.getHeader("request");
if (rawRequest == null) {
+ // Avoid a new buffer creation for every HTTP request
+ final UnsynchronizedBuffer buffer = threadLocalBuffer.get();
try (ServletInputStream inputStream = request.getInputStream()) {
- rawRequest = AvaticaUtils.readFully(inputStream);
+ rawRequest = AvaticaUtils.readFully(inputStream, buffer);
+ } finally {
+ // Reset the offset into the buffer after we're done
+ buffer.reset();
}
}
final String jsonRequest =
http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
index aeebad7..27e73de 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
@@ -28,6 +28,7 @@ import org.apache.calcite.avatica.remote.ProtobufTranslation;
import org.apache.calcite.avatica.remote.ProtobufTranslationImpl;
import org.apache.calcite.avatica.remote.Service;
import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
@@ -55,19 +56,28 @@ public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAw
private final MetricsSystem metrics;
private final Timer requestTimer;
+ final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer;
+
public AvaticaProtobufHandler(Service service) {
this(service, NoopMetricsSystem.getInstance());
}
public AvaticaProtobufHandler(Service service, MetricsSystem metrics) {
- this.protobufTranslation = new ProtobufTranslationImpl();
this.service = Objects.requireNonNull(service);
this.metrics = Objects.requireNonNull(metrics);
- this.pbHandler = new ProtobufHandler(service, protobufTranslation, metrics);
this.requestTimer = this.metrics.getTimer(
MetricsHelper.concat(AvaticaProtobufHandler.class,
MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME));
+
+ this.protobufTranslation = new ProtobufTranslationImpl();
+ this.pbHandler = new ProtobufHandler(service, protobufTranslation, metrics);
+
+ this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() {
+ @Override public UnsynchronizedBuffer initialValue() {
+ return new UnsynchronizedBuffer();
+ }
+ };
}
public void handle(String target, Request baseRequest,
@@ -78,8 +88,12 @@ public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAw
response.setStatus(HttpServletResponse.SC_OK);
if (request.getMethod().equals("POST")) {
byte[] requestBytes;
+ // Avoid a new buffer creation for every HTTP request
+ final UnsynchronizedBuffer buffer = threadLocalBuffer.get();
try (ServletInputStream inputStream = request.getInputStream()) {
- requestBytes = AvaticaUtils.readFullyToBytes(inputStream);
+ requestBytes = AvaticaUtils.readFullyToBytes(inputStream, buffer);
+ } finally {
+ buffer.reset();
}
HandlerResponse<byte[]> handlerResponse = pbHandler.apply(requestBytes);
http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/com/google/protobuf/HBaseZeroCopyByteString.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/com/google/protobuf/HBaseZeroCopyByteString.java b/avatica/src/main/java/com/google/protobuf/HBaseZeroCopyByteString.java
new file mode 100644
index 0000000..62c4dd2
--- /dev/null
+++ b/avatica/src/main/java/com/google/protobuf/HBaseZeroCopyByteString.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.protobuf;
+
+/**
+ * Helper class to extract byte arrays from {@link ByteString} without copy.
+ *
+ * Without this protobufs would force us to copy every single byte array out
+ * of the objects de-serialized from the wire (which already do one copy, on
+ * top of the copies the JVM does to go from kernel buffer to C buffer and
+ * from C buffer to JVM buffer).
+ *
+ * Graciously copied from Apache HBase.
+ */
+public final class HBaseZeroCopyByteString extends LiteralByteString {
+ // Gotten from AsyncHBase code base with permission.
+ /** Private constructor so this class cannot be instantiated. */
+ private HBaseZeroCopyByteString() {
+ super(null);
+ throw new UnsupportedOperationException("Should never be here.");
+ }
+
+ /**
+ * Wraps a byte array in a {@link ByteString} without copying it.
+ *
+ * @param array The byte array to wrap
+ * @return a ByteString wrapping the <code>array</code>
+ */
+ public static ByteString wrap(final byte[] array) {
+ return new LiteralByteString(array);
+ }
+
+ /**
+ * Wraps a subset of a byte array in a {@link ByteString} without copying it.
+ *
+ * @param array The byte array to wrap
+ * @param offset the start of data in the array
+ * @param length The number of bytes of data at <code>offset</code>
+ * @return a ByteString wrapping the <code>array</code>
+ */
+ public static ByteString wrap(final byte[] array, int offset, int length) {
+ return new BoundedByteString(array, offset, length);
+ }
+
+
+ /**
+ * Extracts the byte array from the given {@link ByteString} without copy.
+ * @param buf A buffer from which to extract the array. This buffer must be
+ * actually an instance of a {@code LiteralByteString}.
+ *
+ * @param buf <code>ByteString</code> to access
+ * @return The underlying byte array of the ByteString
+ */
+ public static byte[] zeroCopyGetBytes(final ByteString buf) {
+ if (buf instanceof LiteralByteString) {
+ return ((LiteralByteString) buf).bytes;
+ }
+ throw new UnsupportedOperationException("Need a LiteralByteString, got a "
+ + buf.getClass().getName());
+ }
+}
+
+// End HBaseZeroCopyByteString.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/com/google/protobuf/package-info.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/com/google/protobuf/package-info.java b/avatica/src/main/java/com/google/protobuf/package-info.java
new file mode 100644
index 0000000..92f110e
--- /dev/null
+++ b/avatica/src/main/java/com/google/protobuf/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Avatica-custom classes to access protected classes in Google Protobuf.
+ */
+@PackageMarker
+package com.google.protobuf;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
index 9382f87..a999f19 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
@@ -16,13 +16,15 @@
*/
package org.apache.calcite.avatica;
-import java.io.ByteArrayOutputStream;
+import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
+
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.AbstractList;
@@ -45,6 +47,12 @@ public class AvaticaUtils {
private static final Set<String> UNIQUE_STRINGS = new HashSet<>();
+ private static final ThreadLocal<byte[]> PER_THREAD_BUFFER = new ThreadLocal<byte[]>() {
+ @Override protected byte[] initialValue() {
+ return new byte[4096];
+ }
+ };
+
private AvaticaUtils() {}
static {
@@ -200,25 +208,46 @@ public class AvaticaUtils {
/** Reads the contents of an input stream and returns as a string. */
public static String readFully(InputStream inputStream) throws IOException {
- return _readFully(inputStream).toString();
+ return readFully(inputStream, new UnsynchronizedBuffer(1024));
}
+ /** Reads the contents of an input stream and returns as a string. */
+ public static String readFully(InputStream inputStream, UnsynchronizedBuffer buffer)
+ throws IOException {
+ // Variant that lets us use a pooled Buffer
+ final byte[] bytes = _readFully(inputStream, buffer);
+ return new String(bytes, 0, bytes.length, StandardCharsets.UTF_8);
+ }
+
+ /** Reads the contents of an input stream and returns as a string. */
public static byte[] readFullyToBytes(InputStream inputStream) throws IOException {
- return _readFully(inputStream).toByteArray();
+ return readFullyToBytes(inputStream, new UnsynchronizedBuffer(1024));
}
- /** Reads the contents of an input stream and returns a ByteArrayOutputStrema. */
- static ByteArrayOutputStream _readFully(InputStream inputStream) throws IOException {
- final byte[] bytes = new byte[4096];
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ /** Reads the contents of an input stream and returns as a string. */
+ public static byte[] readFullyToBytes(InputStream inputStream, UnsynchronizedBuffer buffer)
+ throws IOException {
+ // Variant that lets us use a pooled Buffer
+ return _readFully(inputStream, buffer);
+ }
+
+ /**
+ * Reads the contents of an input stream and returns a byte array.
+ *
+ * @param inputStream the input to read from.
+ * @return A byte array whose length is equal to the number of bytes contained.
+ */
+ static byte[] _readFully(InputStream inputStream, UnsynchronizedBuffer buffer)
+ throws IOException {
+ final byte[] bytes = PER_THREAD_BUFFER.get();
for (;;) {
int count = inputStream.read(bytes, 0, bytes.length);
if (count < 0) {
break;
}
- baos.write(bytes, 0, count);
+ buffer.write(bytes, 0, count);
}
- return baos;
+ return buffer.toArray();
}
/** Invokes {@code Statement#setLargeMaxRows}, falling back on
http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
index acb82db..7142d59 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
@@ -19,8 +19,6 @@ package org.apache.calcite.avatica.remote;
import org.apache.calcite.avatica.remote.Service.Request;
import org.apache.calcite.avatica.remote.Service.Response;
-import com.google.protobuf.InvalidProtocolBufferException;
-
import java.io.IOException;
/**
@@ -49,18 +47,18 @@ public interface ProtobufTranslation {
*
* @param bytes Serialized protocol buffer request from client
* @return A Request object for the given bytes
- * @throws InvalidProtocolBufferException If the protocol buffer cannot be deserialized
+ * @throws IOException If the protocol buffer cannot be deserialized
*/
- Request parseRequest(byte[] bytes) throws InvalidProtocolBufferException;
+ Request parseRequest(byte[] bytes) throws IOException;
/**
* Parses a serialized protocol buffer response into a {@link Response}.
*
* @param bytes Serialized protocol buffer request from server
* @return The Response object for the given bytes
- * @throws InvalidProtocolBufferException If the protocol buffer cannot be deserialized
+ * @throws IOException If the protocol buffer cannot be deserialized
*/
- Response parseResponse(byte[] bytes) throws InvalidProtocolBufferException;
+ Response parseResponse(byte[] bytes) throws IOException;
}
// End ProtobufTranslation.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
index 646d706..80d2b22 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
@@ -54,17 +54,22 @@ import org.apache.calcite.avatica.proto.Responses.SyncResultsResponse;
import org.apache.calcite.avatica.remote.Service.Request;
import org.apache.calcite.avatica.remote.Service.Response;
import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.Message;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Implementation of {@link ProtobufTranslationImpl} that translates
@@ -75,9 +80,10 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
// Extremely ugly mapping of PB class name into a means to convert it to the POJO
private static final Map<String, RequestTranslator> REQUEST_PARSERS;
private static final Map<String, ResponseTranslator> RESPONSE_PARSERS;
+ private static final Map<Class<?>, ByteString> MESSAGE_CLASSES;
static {
- HashMap<String, RequestTranslator> reqParsers = new HashMap<>();
+ Map<String, RequestTranslator> reqParsers = new ConcurrentHashMap<>();
reqParsers.put(CatalogsRequest.class.getName(),
new RequestTranslator(CatalogsRequest.parser(), new Service.CatalogsRequest()));
reqParsers.put(OpenConnectionRequest.class.getName(),
@@ -123,7 +129,7 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
REQUEST_PARSERS = Collections.unmodifiableMap(reqParsers);
- HashMap<String, ResponseTranslator> respParsers = new HashMap<>();
+ Map<String, ResponseTranslator> respParsers = new ConcurrentHashMap<>();
respParsers.put(OpenConnectionResponse.class.getName(),
new ResponseTranslator(OpenConnectionResponse.parser(),
new Service.OpenConnectionResponse()));
@@ -162,8 +168,65 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
new ResponseTranslator(RollbackResponse.parser(), new Service.RollbackResponse()));
RESPONSE_PARSERS = Collections.unmodifiableMap(respParsers);
+
+ Map<Class<?>, ByteString> messageClassNames = new ConcurrentHashMap<>();
+ for (Class<?> msgClz : getAllMessageClasses()) {
+ messageClassNames.put(msgClz, wrapClassName(msgClz));
+ }
+ MESSAGE_CLASSES = Collections.unmodifiableMap(messageClassNames);
+ }
+
+ private static List<Class<?>> getAllMessageClasses() {
+ List<Class<?>> messageClasses = new ArrayList<>();
+ messageClasses.add(CatalogsRequest.class);
+ messageClasses.add(CloseConnectionRequest.class);
+ messageClasses.add(CloseStatementRequest.class);
+ messageClasses.add(ColumnsRequest.class);
+ messageClasses.add(CommitRequest.class);
+ messageClasses.add(ConnectionSyncRequest.class);
+ messageClasses.add(CreateStatementRequest.class);
+ messageClasses.add(DatabasePropertyRequest.class);
+ messageClasses.add(ExecuteRequest.class);
+ messageClasses.add(FetchRequest.class);
+ messageClasses.add(OpenConnectionRequest.class);
+ messageClasses.add(PrepareAndExecuteRequest.class);
+ messageClasses.add(PrepareRequest.class);
+ messageClasses.add(RollbackRequest.class);
+ messageClasses.add(SchemasRequest.class);
+ messageClasses.add(SyncResultsRequest.class);
+ messageClasses.add(TableTypesRequest.class);
+ messageClasses.add(TablesRequest.class);
+ messageClasses.add(TypeInfoRequest.class);
+ messageClasses.add(CloseConnectionResponse.class);
+ messageClasses.add(CloseStatementResponse.class);
+ messageClasses.add(CommitResponse.class);
+ messageClasses.add(ConnectionSyncResponse.class);
+ messageClasses.add(CreateStatementResponse.class);
+ messageClasses.add(DatabasePropertyResponse.class);
+ messageClasses.add(ErrorResponse.class);
+ messageClasses.add(ExecuteResponse.class);
+ messageClasses.add(FetchResponse.class);
+ messageClasses.add(OpenConnectionResponse.class);
+ messageClasses.add(PrepareResponse.class);
+ messageClasses.add(ResultSetResponse.class);
+ messageClasses.add(RollbackResponse.class);
+ messageClasses.add(RpcMetadata.class);
+ messageClasses.add(SyncResultsResponse.class);
+
+ return messageClasses;
+ }
+
+ private static ByteString wrapClassName(Class<?> clz) {
+ return HBaseZeroCopyByteString.wrap(clz.getName().getBytes(UTF_8));
}
+ private final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer =
+ new ThreadLocal<UnsynchronizedBuffer>() {
+ @Override protected UnsynchronizedBuffer initialValue() {
+ return new UnsynchronizedBuffer();
+ }
+ };
+
/**
* Fetches the concrete message's Parser implementation.
*
@@ -207,42 +270,79 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
}
@Override public byte[] serializeResponse(Response response) throws IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- Message responseMsg = response.serialize();
- serializeMessage(out, responseMsg);
- return out.toByteArray();
+ // Avoid BAOS for its synchronized write methods, we don't need that concurrency control
+ UnsynchronizedBuffer out = threadLocalBuffer.get();
+ try {
+ Message responseMsg = response.serialize();
+ serializeMessage(out, responseMsg);
+ return out.toArray();
+ } finally {
+ out.reset();
+ }
}
@Override public byte[] serializeRequest(Request request) throws IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- Message requestMsg = request.serialize();
- serializeMessage(out, requestMsg);
- return out.toByteArray();
+ // Avoid BAOS for its synchronized write methods, we don't need that concurrency control
+ UnsynchronizedBuffer out = threadLocalBuffer.get();
+ try {
+ Message requestMsg = request.serialize();
+ serializeMessage(out, requestMsg);
+ return out.toArray();
+ } finally {
+ out.reset();
+ }
}
void serializeMessage(OutputStream out, Message msg) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- msg.writeTo(baos);
+ // Serialize the protobuf message
+ UnsynchronizedBuffer buffer = threadLocalBuffer.get();
+ ByteString serializedMsg;
+ try {
+ msg.writeTo(buffer);
+ // Make a bytestring from it
+ serializedMsg = HBaseZeroCopyByteString.wrap(buffer.toArray());
+ } finally {
+ buffer.reset();
+ }
- // TODO Using ByteString is copying the bytes of the message which sucks. Could try to
- // lift the ZeroCopy implementation from HBase.
- WireMessage wireMsg = WireMessage.newBuilder().setName(msg.getClass().getName()).
- setWrappedMessage(ByteString.copyFrom(baos.toByteArray())).build();
+ // Wrap the serialized message in a WireMessage
+ WireMessage wireMsg = WireMessage.newBuilder().setNameBytes(getClassNameBytes(msg.getClass()))
+ .setWrappedMessage(serializedMsg).build();
+ // Write the WireMessage to the provided OutputStream
wireMsg.writeTo(out);
}
- @Override public Request parseRequest(byte[] bytes) throws InvalidProtocolBufferException {
- WireMessage wireMsg = WireMessage.parseFrom(bytes);
+ ByteString getClassNameBytes(Class<?> clz) {
+ ByteString byteString = MESSAGE_CLASSES.get(clz);
+ if (null == byteString) {
+ throw new IllegalArgumentException("Missing ByteString for " + clz.getName());
+ }
+ return byteString;
+ }
+
+ @Override public Request parseRequest(byte[] bytes) throws IOException {
+ ByteString byteString = HBaseZeroCopyByteString.wrap(bytes);
+ CodedInputStream inputStream = byteString.newCodedInput();
+ // Enable aliasing to avoid an extra copy to get at the serialized Request inside of the
+ // WireMessage.
+ inputStream.enableAliasing(true);
+ WireMessage wireMsg = WireMessage.parseFrom(inputStream);
String serializedMessageClassName = wireMsg.getName();
RequestTranslator translator = getParserForRequest(serializedMessageClassName);
+ // The ByteString should be logical offsets into the original byte array
return translator.transform(wireMsg.getWrappedMessage());
}
- @Override public Response parseResponse(byte[] bytes) throws InvalidProtocolBufferException {
- WireMessage wireMsg = WireMessage.parseFrom(bytes);
+ @Override public Response parseResponse(byte[] bytes) throws IOException {
+ ByteString byteString = HBaseZeroCopyByteString.wrap(bytes);
+ CodedInputStream inputStream = byteString.newCodedInput();
+ // Enable aliasing to avoid an extra copy to get at the serialized Response inside of the
+ // WireMessage.
+ inputStream.enableAliasing(true);
+ WireMessage wireMsg = WireMessage.parseFrom(inputStream);
String serializedMessageClassName = wireMsg.getName();
ResponseTranslator translator = getParserForResponse(serializedMessageClassName);
http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
index 0dadc78..417c6ed 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
@@ -36,7 +36,8 @@ public class RequestTranslator {
public Service.Request transform(ByteString serializedMessage) throws
InvalidProtocolBufferException {
- Message msg = parser.parseFrom(serializedMessage);
+ // This should already be an aliased CodedInputStream from the WireMessage parsing.
+ Message msg = parser.parseFrom(serializedMessage.newCodedInput());
return impl.deserialize(msg);
}
}
http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
index aee5b29..5790848 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -31,8 +31,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
+import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.Message;
import java.io.PrintWriter;
@@ -2549,13 +2550,16 @@ public interface Service {
private static final FieldDescriptor SERVER_ADDRESS_DESCRIPTOR = Responses.RpcMetadata
.getDescriptor().findFieldByNumber(Responses.RpcMetadata.SERVER_ADDRESS_FIELD_NUMBER);
public final String serverAddress;
+ private final ByteString serverAddressAsBytes;
public RpcMetadataResponse() {
this.serverAddress = null;
+ this.serverAddressAsBytes = null;
}
public RpcMetadataResponse(@JsonProperty("serverAddress") String serverAddress) {
this.serverAddress = serverAddress;
+ this.serverAddressAsBytes = HBaseZeroCopyByteString.wrap(serverAddress.getBytes());
}
@Override RpcMetadataResponse deserialize(Message genericMsg) {
@@ -2566,7 +2570,7 @@ public interface Service {
}
@Override Responses.RpcMetadata serialize() {
- return Responses.RpcMetadata.newBuilder().setServerAddress(serverAddress).build();
+ return Responses.RpcMetadata.newBuilder().setServerAddressBytes(serverAddressAsBytes).build();
}
static RpcMetadataResponse fromProto(Responses.RpcMetadata msg) {
http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java
index 5c80816..d96293b 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java
@@ -23,6 +23,7 @@ import org.apache.calcite.avatica.util.DateTimeUtils;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.protobuf.HBaseZeroCopyByteString;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -345,7 +346,7 @@ public class TypedValue {
break;
case BYTE_STRING:
case STRING:
- builder.setStringValue((String) value);
+ builder.setStringValueBytes(HBaseZeroCopyByteString.wrap(((String) value).getBytes()));
break;
case PRIMITIVE_CHAR:
case CHARACTER:
http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/util/UnsynchronizedBuffer.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/util/UnsynchronizedBuffer.java b/avatica/src/main/java/org/apache/calcite/avatica/util/UnsynchronizedBuffer.java
new file mode 100644
index 0000000..8daee60
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/util/UnsynchronizedBuffer.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A utility class for reading and writing bytes to byte buffers without synchronization. A
+ * reduced variant taken from Apache Accumulo. This class is <b>not</b> thread-safe by design.
+ * It is up to the caller to guarantee mutual exclusion as necessary.
+ */
+public class UnsynchronizedBuffer extends OutputStream {
+ // Anything larger than 64K, reap the backing buffer
+ private static final int LARGE_BUFFER_SIZE = 1024 * 64;
+
+ final int initialCapacity;
+ int offset = 0;
+ byte[] data;
+
+ /**
+ * Creates a new writer.
+ */
+ public UnsynchronizedBuffer() {
+ this(4096);
+ }
+
+ /**
+ * Creates a new writer.
+ *
+ * @param initialCapacity initial byte capacity
+ */
+ public UnsynchronizedBuffer(int initialCapacity) {
+ this.initialCapacity = initialCapacity;
+ data = new byte[initialCapacity];
+ }
+
+ private void reserve(int l) {
+ if (offset + l > data.length) {
+ int newSize = UnsynchronizedBuffer.nextArraySize(offset + l);
+
+ byte[] newData = new byte[newSize];
+ System.arraycopy(data, 0, newData, 0, offset);
+ data = newData;
+ }
+
+ }
+
+ /**
+ * Adds bytes to this writer's buffer.
+ *
+ * @param bytes byte array
+ * @param off offset into array to start copying bytes
+ * @param length number of bytes to add
+ * @throws IndexOutOfBoundsException if off or length are invalid
+ */
+ public void write(byte[] bytes, int off, int length) {
+ reserve(length);
+ System.arraycopy(bytes, off, data, offset, length);
+ offset += length;
+ }
+
+ @Override public void write(int b) throws IOException {
+ reserve(1);
+ data[offset] = (byte) b;
+ offset++;
+ }
+
+ /**
+ * Gets (a copy of) the contents of this writer's buffer.
+ *
+ * @return byte buffer contents
+ */
+ public byte[] toArray() {
+ byte[] ret = new byte[offset];
+ System.arraycopy(data, 0, ret, 0, offset);
+ return ret;
+ }
+
+ /**
+ * Resets the internal pointer into the buffer.
+ */
+ public void reset() {
+ offset = 0;
+ if (data.length >= LARGE_BUFFER_SIZE) {
+ data = new byte[this.initialCapacity];
+ }
+ }
+
+ /**
+ * @return The current offset into the backing array.
+ */
+ public int getOffset() {
+ return offset;
+ }
+
+ /**
+ * @return The current length of the backing array.
+ */
+ public long getSize() {
+ return data.length;
+ }
+
+ /**
+ * Determines what next array size should be by rounding up to next power of two.
+ *
+ * @param i current array size
+ * @return next array size
+ * @throws IllegalArgumentException if i is negative
+ */
+ public static int nextArraySize(int i) {
+ if (i < 0) {
+ throw new IllegalArgumentException();
+ }
+
+ if (i > (1 << 30)) {
+ return Integer.MAX_VALUE; // this is the next power of 2 minus one... a special case
+ }
+
+ if (i == 0) {
+ return 1;
+ }
+
+ // round up to next power of two
+ int ret = i;
+ ret--;
+ ret |= ret >> 1;
+ ret |= ret >> 2;
+ ret |= ret >> 4;
+ ret |= ret >> 8;
+ ret |= ret >> 16;
+ ret++;
+
+ return ret;
+ }
+}
+
+// End UnsynchronizedBuffer.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java
new file mode 100644
index 0000000..cd8a329
--- /dev/null
+++ b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ColumnMetaData.Rep;
+import org.apache.calcite.avatica.Meta.Signature;
+import org.apache.calcite.avatica.Meta.StatementHandle;
+import org.apache.calcite.avatica.proto.Common.WireMessage;
+import org.apache.calcite.avatica.proto.Requests;
+import org.apache.calcite.avatica.remote.Service.Request;
+
+import com.google.protobuf.HBaseZeroCopyByteString;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Protobuf serialization tests.
+ */
+public class ProtobufSerializationTest {
+
+ private Signature getSignature() {
+ return null;
+ }
+
+ private List<TypedValue> getTypedValues() {
+ List<TypedValue> paramValues =
+ Arrays.asList(TypedValue.create(Rep.BOOLEAN.name(), Boolean.TRUE),
+ TypedValue.create(Rep.STRING.name(), "string"));
+ return paramValues;
+ }
+
+ @Test public void testExecuteSerialization() throws Exception {
+ Service.ExecuteRequest executeRequest = new Service.ExecuteRequest(
+ new StatementHandle("connection", 12345, getSignature()), getTypedValues(), 0);
+
+ Requests.ExecuteRequest pbExecuteRequest = executeRequest.serialize();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+ pbExecuteRequest.writeTo(baos);
+
+ byte[] serialized = baos.toByteArray();
+ baos.reset();
+ WireMessage wireMsg = WireMessage.newBuilder().setName(Requests.ExecuteRequest.class.getName())
+ .setWrappedMessage(HBaseZeroCopyByteString.wrap(serialized)).build();
+ wireMsg.writeTo(baos);
+ serialized = baos.toByteArray();
+
+ ProtobufTranslation translator = new ProtobufTranslationImpl();
+
+ Request newRequest = translator.parseRequest(serialized);
+
+ Assert.assertEquals(executeRequest, newRequest);
+ }
+
+}
+
+// End ProtobufSerializationTest.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/test/java/org/apache/calcite/avatica/util/UnsynchronizedBufferTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/util/UnsynchronizedBufferTest.java b/avatica/src/test/java/org/apache/calcite/avatica/util/UnsynchronizedBufferTest.java
new file mode 100644
index 0000000..a448d3e
--- /dev/null
+++ b/avatica/src/test/java/org/apache/calcite/avatica/util/UnsynchronizedBufferTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the UnsynchronizedBuffer.
+ */
+public class UnsynchronizedBufferTest {
+
+ @Test public void testArrayResizing() {
+ int size = 64;
+ int expected = 128;
+ for (int i = 0; i < 10; i++) {
+ // We keep being one byte short to contain this message
+ int next = UnsynchronizedBuffer.nextArraySize(size + 1);
+ assertEquals(expected, next);
+ size = next;
+ expected *= 2;
+ }
+ }
+}
+
+// End UnsynchronizedBufferTest.java