You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by el...@apache.org on 2016/03/03 18:30:48 UTC

[4/7] calcite git commit: [CALCITE-1094] Replace ByteArrayOutputStream to avoid synchronized writes

[CALCITE-1094] Replace ByteArrayOutputStream to avoid synchronized writes

Pull in the ZeroCopyByteString class. We can cap the amount of byte[]'s
that we are making by providing a buffer to the current thread. This also
avoids any synchronization on a typical object pool.


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/1d3a26df
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/1d3a26df
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/1d3a26df

Branch: refs/heads/master
Commit: 1d3a26dfac17fea458402a637449007dc095bced
Parents: 0de38aa
Author: Josh Elser <el...@apache.org>
Authored: Wed Mar 2 17:43:41 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Wed Mar 2 23:26:02 2016 -0500

----------------------------------------------------------------------
 .../avatica/server/AvaticaJsonHandler.java      |  16 +-
 .../avatica/server/AvaticaProtobufHandler.java  |  20 ++-
 .../protobuf/HBaseZeroCopyByteString.java       |  77 ++++++++++
 .../java/com/google/protobuf/package-info.java  |  26 ++++
 .../apache/calcite/avatica/AvaticaUtils.java    |  47 ++++--
 .../avatica/remote/ProtobufTranslation.java     |  10 +-
 .../avatica/remote/ProtobufTranslationImpl.java | 146 +++++++++++++++---
 .../avatica/remote/RequestTranslator.java       |   3 +-
 .../apache/calcite/avatica/remote/Service.java  |   8 +-
 .../calcite/avatica/remote/TypedValue.java      |   3 +-
 .../avatica/util/UnsynchronizedBuffer.java      | 152 +++++++++++++++++++
 .../remote/ProtobufSerializationTest.java       |  75 +++++++++
 .../avatica/util/UnsynchronizedBufferTest.java  |  41 +++++
 13 files changed, 578 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
index 703a2c3..34a9333 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
@@ -25,6 +25,7 @@ import org.apache.calcite.avatica.remote.Handler.HandlerResponse;
 import org.apache.calcite.avatica.remote.JsonHandler;
 import org.apache.calcite.avatica.remote.Service;
 import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
 
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.handler.AbstractHandler;
@@ -54,6 +55,8 @@ public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareA
   final MetricsSystem metrics;
   final Timer requestTimer;
 
+  final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer;
+
   public AvaticaJsonHandler(Service service) {
     this(service, NoopMetricsSystem.getInstance());
   }
@@ -67,6 +70,12 @@ public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareA
     // Metrics
     this.requestTimer = this.metrics.getTimer(
         concat(AvaticaJsonHandler.class, MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME));
+
+    this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() {
+      @Override public UnsynchronizedBuffer initialValue() {
+        return new UnsynchronizedBuffer();
+      }
+    };
   }
 
   public void handle(String target, Request baseRequest,
@@ -80,8 +89,13 @@ public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareA
         // The latter allows very large requests without hitting HTTP 413.
         String rawRequest = request.getHeader("request");
         if (rawRequest == null) {
+          // Avoid a new buffer creation for every HTTP request
+          final UnsynchronizedBuffer buffer = threadLocalBuffer.get();
           try (ServletInputStream inputStream = request.getInputStream()) {
-            rawRequest = AvaticaUtils.readFully(inputStream);
+            rawRequest = AvaticaUtils.readFully(inputStream, buffer);
+          } finally {
+            // Reset the offset into the buffer after we're done
+            buffer.reset();
           }
         }
         final String jsonRequest =

http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
index aeebad7..27e73de 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
@@ -28,6 +28,7 @@ import org.apache.calcite.avatica.remote.ProtobufTranslation;
 import org.apache.calcite.avatica.remote.ProtobufTranslationImpl;
 import org.apache.calcite.avatica.remote.Service;
 import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
 
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.handler.AbstractHandler;
@@ -55,19 +56,28 @@ public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAw
   private final MetricsSystem metrics;
   private final Timer requestTimer;
 
+  final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer;
+
   public AvaticaProtobufHandler(Service service) {
     this(service, NoopMetricsSystem.getInstance());
   }
 
   public AvaticaProtobufHandler(Service service, MetricsSystem metrics) {
-    this.protobufTranslation = new ProtobufTranslationImpl();
     this.service = Objects.requireNonNull(service);
     this.metrics = Objects.requireNonNull(metrics);
-    this.pbHandler = new ProtobufHandler(service, protobufTranslation, metrics);
 
     this.requestTimer = this.metrics.getTimer(
         MetricsHelper.concat(AvaticaProtobufHandler.class,
             MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME));
+
+    this.protobufTranslation = new ProtobufTranslationImpl();
+    this.pbHandler = new ProtobufHandler(service, protobufTranslation, metrics);
+
+    this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() {
+      @Override public UnsynchronizedBuffer initialValue() {
+        return new UnsynchronizedBuffer();
+      }
+    };
   }
 
   public void handle(String target, Request baseRequest,
@@ -78,8 +88,12 @@ public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAw
       response.setStatus(HttpServletResponse.SC_OK);
       if (request.getMethod().equals("POST")) {
         byte[] requestBytes;
+        // Avoid a new buffer creation for every HTTP request
+        final UnsynchronizedBuffer buffer = threadLocalBuffer.get();
         try (ServletInputStream inputStream = request.getInputStream()) {
-          requestBytes = AvaticaUtils.readFullyToBytes(inputStream);
+          requestBytes = AvaticaUtils.readFullyToBytes(inputStream, buffer);
+        } finally {
+          buffer.reset();
         }
 
         HandlerResponse<byte[]> handlerResponse = pbHandler.apply(requestBytes);

http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/com/google/protobuf/HBaseZeroCopyByteString.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/com/google/protobuf/HBaseZeroCopyByteString.java b/avatica/src/main/java/com/google/protobuf/HBaseZeroCopyByteString.java
new file mode 100644
index 0000000..62c4dd2
--- /dev/null
+++ b/avatica/src/main/java/com/google/protobuf/HBaseZeroCopyByteString.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.protobuf;
+
+/**
+ * Helper class to extract byte arrays from {@link ByteString} without copy.
+ *
+ * Without this protobufs would force us to copy every single byte array out
+ * of the objects de-serialized from the wire (which already do one copy, on
+ * top of the copies the JVM does to go from kernel buffer to C buffer and
+ * from C buffer to JVM buffer).
+ *
+ * Graciously copied from Apache HBase.
+ */
+public final class HBaseZeroCopyByteString extends LiteralByteString {
+  // Gotten from AsyncHBase code base with permission.
+  /** Private constructor so this class cannot be instantiated. */
+  private HBaseZeroCopyByteString() {
+    super(null);
+    throw new UnsupportedOperationException("Should never be here.");
+  }
+
+  /**
+   * Wraps a byte array in a {@link ByteString} without copying it.
+   *
+   * @param array The byte array to wrap
+   * @return a ByteString wrapping the <code>array</code>
+   */
+  public static ByteString wrap(final byte[] array) {
+    return new LiteralByteString(array);
+  }
+
+  /**
+   * Wraps a subset of a byte array in a {@link ByteString} without copying it.
+   *
+   * @param array The byte array to wrap
+   * @param offset the start of data in the array
+   * @param length The number of bytes of data at <code>offset</code>
+   * @return a ByteString wrapping the <code>array</code>
+   */
+  public static ByteString wrap(final byte[] array, int offset, int length) {
+    return new BoundedByteString(array, offset, length);
+  }
+
+
+  /**
+   * Extracts the byte array from the given {@link ByteString} without copy.
+   * @param buf A buffer from which to extract the array.  This buffer must be
+   * actually an instance of a {@code LiteralByteString}.
+   *
+   * @param buf <code>ByteString</code> to access
+   * @return The underlying byte array of the ByteString
+   */
+  public static byte[] zeroCopyGetBytes(final ByteString buf) {
+    if (buf instanceof LiteralByteString) {
+      return ((LiteralByteString) buf).bytes;
+    }
+    throw new UnsupportedOperationException("Need a LiteralByteString, got a "
+                                            + buf.getClass().getName());
+  }
+}
+
+// End HBaseZeroCopyByteString.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/com/google/protobuf/package-info.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/com/google/protobuf/package-info.java b/avatica/src/main/java/com/google/protobuf/package-info.java
new file mode 100644
index 0000000..92f110e
--- /dev/null
+++ b/avatica/src/main/java/com/google/protobuf/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Avatica-custom classes to access protected classes in Google Protobuf.
+ */
+@PackageMarker
+package com.google.protobuf;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
index 9382f87..a999f19 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
@@ -16,13 +16,15 @@
  */
 package org.apache.calcite.avatica;
 
-import java.io.ByteArrayOutputStream;
+import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.invoke.MethodHandle;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.MethodType;
 import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.AbstractList;
@@ -45,6 +47,12 @@ public class AvaticaUtils {
 
   private static final Set<String> UNIQUE_STRINGS = new HashSet<>();
 
+  private static final ThreadLocal<byte[]> PER_THREAD_BUFFER  = new ThreadLocal<byte[]>() {
+    @Override protected byte[] initialValue() {
+      return new byte[4096];
+    }
+  };
+
   private AvaticaUtils() {}
 
   static {
@@ -200,25 +208,46 @@ public class AvaticaUtils {
 
   /** Reads the contents of an input stream and returns as a string. */
   public static String readFully(InputStream inputStream) throws IOException {
-    return _readFully(inputStream).toString();
+    return readFully(inputStream, new UnsynchronizedBuffer(1024));
   }
 
+  /** Reads the contents of an input stream and returns as a string. */
+  public static String readFully(InputStream inputStream, UnsynchronizedBuffer buffer)
+      throws IOException {
+    // Variant that lets us use a pooled Buffer
+    final byte[] bytes = _readFully(inputStream, buffer);
+    return new String(bytes, 0, bytes.length, StandardCharsets.UTF_8);
+  }
+
+  /** Reads the contents of an input stream and returns as a string. */
   public static byte[] readFullyToBytes(InputStream inputStream) throws IOException {
-    return _readFully(inputStream).toByteArray();
+    return readFullyToBytes(inputStream, new UnsynchronizedBuffer(1024));
   }
 
-  /** Reads the contents of an input stream and returns a ByteArrayOutputStrema. */
-  static ByteArrayOutputStream _readFully(InputStream inputStream) throws IOException {
-    final byte[] bytes = new byte[4096];
-    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+  /** Reads the contents of an input stream and returns as a string. */
+  public static byte[] readFullyToBytes(InputStream inputStream, UnsynchronizedBuffer buffer)
+      throws IOException {
+    // Variant that lets us use a pooled Buffer
+    return _readFully(inputStream, buffer);
+  }
+
+  /**
+   * Reads the contents of an input stream and returns a byte array.
+   *
+   * @param inputStream the input to read from.
+   * @return A byte array whose length is equal to the number of bytes contained.
+   */
+  static byte[] _readFully(InputStream inputStream, UnsynchronizedBuffer buffer)
+      throws IOException {
+    final byte[] bytes = PER_THREAD_BUFFER.get();
     for (;;) {
       int count = inputStream.read(bytes, 0, bytes.length);
       if (count < 0) {
         break;
       }
-      baos.write(bytes, 0, count);
+      buffer.write(bytes, 0, count);
     }
-    return baos;
+    return buffer.toArray();
   }
 
   /** Invokes {@code Statement#setLargeMaxRows}, falling back on

http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
index acb82db..7142d59 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
@@ -19,8 +19,6 @@ package org.apache.calcite.avatica.remote;
 import org.apache.calcite.avatica.remote.Service.Request;
 import org.apache.calcite.avatica.remote.Service.Response;
 
-import com.google.protobuf.InvalidProtocolBufferException;
-
 import java.io.IOException;
 
 /**
@@ -49,18 +47,18 @@ public interface ProtobufTranslation {
    *
    * @param bytes Serialized protocol buffer request from client
    * @return A Request object for the given bytes
-   * @throws InvalidProtocolBufferException If the protocol buffer cannot be deserialized
+   * @throws IOException If the protocol buffer cannot be deserialized
    */
-  Request parseRequest(byte[] bytes) throws InvalidProtocolBufferException;
+  Request parseRequest(byte[] bytes) throws IOException;
 
   /**
    * Parses a serialized protocol buffer response into a {@link Response}.
    *
    * @param bytes Serialized protocol buffer request from server
    * @return The Response object for the given bytes
-   * @throws InvalidProtocolBufferException If the protocol buffer cannot be deserialized
+   * @throws IOException If the protocol buffer cannot be deserialized
    */
-  Response parseResponse(byte[] bytes) throws InvalidProtocolBufferException;
+  Response parseResponse(byte[] bytes) throws IOException;
 }
 
 // End ProtobufTranslation.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
index 646d706..80d2b22 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
@@ -54,17 +54,22 @@ import org.apache.calcite.avatica.proto.Responses.SyncResultsResponse;
 import org.apache.calcite.avatica.remote.Service.Request;
 import org.apache.calcite.avatica.remote.Service.Response;
 import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
 
 import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.HBaseZeroCopyByteString;
 import com.google.protobuf.Message;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 /**
  * Implementation of {@link ProtobufTranslationImpl} that translates
@@ -75,9 +80,10 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
   // Extremely ugly mapping of PB class name into a means to convert it to the POJO
   private static final Map<String, RequestTranslator> REQUEST_PARSERS;
   private static final Map<String, ResponseTranslator> RESPONSE_PARSERS;
+  private static final Map<Class<?>, ByteString> MESSAGE_CLASSES;
 
   static {
-    HashMap<String, RequestTranslator> reqParsers = new HashMap<>();
+    Map<String, RequestTranslator> reqParsers = new ConcurrentHashMap<>();
     reqParsers.put(CatalogsRequest.class.getName(),
         new RequestTranslator(CatalogsRequest.parser(), new Service.CatalogsRequest()));
     reqParsers.put(OpenConnectionRequest.class.getName(),
@@ -123,7 +129,7 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
 
     REQUEST_PARSERS = Collections.unmodifiableMap(reqParsers);
 
-    HashMap<String, ResponseTranslator> respParsers = new HashMap<>();
+    Map<String, ResponseTranslator> respParsers = new ConcurrentHashMap<>();
     respParsers.put(OpenConnectionResponse.class.getName(),
         new ResponseTranslator(OpenConnectionResponse.parser(),
             new Service.OpenConnectionResponse()));
@@ -162,8 +168,65 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
         new ResponseTranslator(RollbackResponse.parser(), new Service.RollbackResponse()));
 
     RESPONSE_PARSERS = Collections.unmodifiableMap(respParsers);
+
+    Map<Class<?>, ByteString> messageClassNames = new ConcurrentHashMap<>();
+    for (Class<?> msgClz : getAllMessageClasses()) {
+      messageClassNames.put(msgClz, wrapClassName(msgClz));
+    }
+    MESSAGE_CLASSES = Collections.unmodifiableMap(messageClassNames);
+  }
+
+  private static List<Class<?>> getAllMessageClasses() {
+    List<Class<?>> messageClasses = new ArrayList<>();
+    messageClasses.add(CatalogsRequest.class);
+    messageClasses.add(CloseConnectionRequest.class);
+    messageClasses.add(CloseStatementRequest.class);
+    messageClasses.add(ColumnsRequest.class);
+    messageClasses.add(CommitRequest.class);
+    messageClasses.add(ConnectionSyncRequest.class);
+    messageClasses.add(CreateStatementRequest.class);
+    messageClasses.add(DatabasePropertyRequest.class);
+    messageClasses.add(ExecuteRequest.class);
+    messageClasses.add(FetchRequest.class);
+    messageClasses.add(OpenConnectionRequest.class);
+    messageClasses.add(PrepareAndExecuteRequest.class);
+    messageClasses.add(PrepareRequest.class);
+    messageClasses.add(RollbackRequest.class);
+    messageClasses.add(SchemasRequest.class);
+    messageClasses.add(SyncResultsRequest.class);
+    messageClasses.add(TableTypesRequest.class);
+    messageClasses.add(TablesRequest.class);
+    messageClasses.add(TypeInfoRequest.class);
+    messageClasses.add(CloseConnectionResponse.class);
+    messageClasses.add(CloseStatementResponse.class);
+    messageClasses.add(CommitResponse.class);
+    messageClasses.add(ConnectionSyncResponse.class);
+    messageClasses.add(CreateStatementResponse.class);
+    messageClasses.add(DatabasePropertyResponse.class);
+    messageClasses.add(ErrorResponse.class);
+    messageClasses.add(ExecuteResponse.class);
+    messageClasses.add(FetchResponse.class);
+    messageClasses.add(OpenConnectionResponse.class);
+    messageClasses.add(PrepareResponse.class);
+    messageClasses.add(ResultSetResponse.class);
+    messageClasses.add(RollbackResponse.class);
+    messageClasses.add(RpcMetadata.class);
+    messageClasses.add(SyncResultsResponse.class);
+
+    return messageClasses;
+  }
+
+  private static ByteString wrapClassName(Class<?> clz) {
+    return HBaseZeroCopyByteString.wrap(clz.getName().getBytes(UTF_8));
   }
 
+  private final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer =
+      new ThreadLocal<UnsynchronizedBuffer>() {
+        @Override protected UnsynchronizedBuffer initialValue() {
+          return new UnsynchronizedBuffer();
+        }
+      };
+
   /**
    * Fetches the concrete message's Parser implementation.
    *
@@ -207,42 +270,79 @@ public class ProtobufTranslationImpl implements ProtobufTranslation {
   }
 
   @Override public byte[] serializeResponse(Response response) throws IOException {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    Message responseMsg = response.serialize();
-    serializeMessage(out, responseMsg);
-    return out.toByteArray();
+    // Avoid BAOS for its synchronized write methods, we don't need that concurrency control
+    UnsynchronizedBuffer out = threadLocalBuffer.get();
+    try {
+      Message responseMsg = response.serialize();
+      serializeMessage(out, responseMsg);
+      return out.toArray();
+    } finally {
+      out.reset();
+    }
   }
 
   @Override public byte[] serializeRequest(Request request) throws IOException {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    Message requestMsg = request.serialize();
-    serializeMessage(out, requestMsg);
-    return out.toByteArray();
+    // Avoid BAOS for its synchronized write methods, we don't need that concurrency control
+    UnsynchronizedBuffer out = threadLocalBuffer.get();
+    try {
+      Message requestMsg = request.serialize();
+      serializeMessage(out, requestMsg);
+      return out.toArray();
+    } finally {
+      out.reset();
+    }
   }
 
   void serializeMessage(OutputStream out, Message msg) throws IOException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    msg.writeTo(baos);
+    // Serialize the protobuf message
+    UnsynchronizedBuffer buffer = threadLocalBuffer.get();
+    ByteString serializedMsg;
+    try {
+      msg.writeTo(buffer);
+      // Make a bytestring from it
+      serializedMsg = HBaseZeroCopyByteString.wrap(buffer.toArray());
+    } finally {
+      buffer.reset();
+    }
 
-    // TODO Using ByteString is copying the bytes of the message which sucks. Could try to
-    // lift the ZeroCopy implementation from HBase.
-    WireMessage wireMsg = WireMessage.newBuilder().setName(msg.getClass().getName()).
-        setWrappedMessage(ByteString.copyFrom(baos.toByteArray())).build();
+    // Wrap the serialized message in a WireMessage
+    WireMessage wireMsg = WireMessage.newBuilder().setNameBytes(getClassNameBytes(msg.getClass()))
+        .setWrappedMessage(serializedMsg).build();
 
+    // Write the WireMessage to the provided OutputStream
     wireMsg.writeTo(out);
   }
 
-  @Override public Request parseRequest(byte[] bytes) throws InvalidProtocolBufferException {
-    WireMessage wireMsg = WireMessage.parseFrom(bytes);
+  ByteString getClassNameBytes(Class<?> clz) {
+    ByteString byteString = MESSAGE_CLASSES.get(clz);
+    if (null == byteString) {
+      throw new IllegalArgumentException("Missing ByteString for " + clz.getName());
+    }
+    return byteString;
+  }
+
+  @Override public Request parseRequest(byte[] bytes) throws IOException {
+    ByteString byteString = HBaseZeroCopyByteString.wrap(bytes);
+    CodedInputStream inputStream = byteString.newCodedInput();
+    // Enable aliasing to avoid an extra copy to get at the serialized Request inside of the
+    // WireMessage.
+    inputStream.enableAliasing(true);
+    WireMessage wireMsg = WireMessage.parseFrom(inputStream);
 
     String serializedMessageClassName = wireMsg.getName();
     RequestTranslator translator = getParserForRequest(serializedMessageClassName);
 
+    // The ByteString should be logical offsets into the original byte array
     return translator.transform(wireMsg.getWrappedMessage());
   }
 
-  @Override public Response parseResponse(byte[] bytes) throws InvalidProtocolBufferException {
-    WireMessage wireMsg = WireMessage.parseFrom(bytes);
+  @Override public Response parseResponse(byte[] bytes) throws IOException {
+    ByteString byteString = HBaseZeroCopyByteString.wrap(bytes);
+    CodedInputStream inputStream = byteString.newCodedInput();
+    // Enable aliasing to avoid an extra copy to get at the serialized Response inside of the
+    // WireMessage.
+    inputStream.enableAliasing(true);
+    WireMessage wireMsg = WireMessage.parseFrom(inputStream);
 
     String serializedMessageClassName = wireMsg.getName();
     ResponseTranslator translator = getParserForResponse(serializedMessageClassName);

http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
index 0dadc78..417c6ed 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
@@ -36,7 +36,8 @@ public class RequestTranslator {
 
   public Service.Request transform(ByteString serializedMessage) throws
       InvalidProtocolBufferException {
-    Message msg = parser.parseFrom(serializedMessage);
+    // This should already be an aliased CodedInputStream from the WireMessage parsing.
+    Message msg = parser.parseFrom(serializedMessage.newCodedInput());
     return impl.deserialize(msg);
   }
 }

http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
index aee5b29..5790848 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -31,8 +31,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
+import com.google.protobuf.ByteString;
 import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.HBaseZeroCopyByteString;
 import com.google.protobuf.Message;
 
 import java.io.PrintWriter;
@@ -2549,13 +2550,16 @@ public interface Service {
     private static final FieldDescriptor SERVER_ADDRESS_DESCRIPTOR = Responses.RpcMetadata
         .getDescriptor().findFieldByNumber(Responses.RpcMetadata.SERVER_ADDRESS_FIELD_NUMBER);
     public final String serverAddress;
+    private final ByteString serverAddressAsBytes;
 
     public RpcMetadataResponse() {
       this.serverAddress = null;
+      this.serverAddressAsBytes = null;
     }
 
     public RpcMetadataResponse(@JsonProperty("serverAddress") String serverAddress) {
       this.serverAddress = serverAddress;
+      this.serverAddressAsBytes = HBaseZeroCopyByteString.wrap(serverAddress.getBytes());
     }
 
     @Override RpcMetadataResponse deserialize(Message genericMsg) {
@@ -2566,7 +2570,7 @@ public interface Service {
     }
 
     @Override Responses.RpcMetadata serialize() {
-      return Responses.RpcMetadata.newBuilder().setServerAddress(serverAddress).build();
+      return Responses.RpcMetadata.newBuilder().setServerAddressBytes(serverAddressAsBytes).build();
     }
 
     static RpcMetadataResponse fromProto(Responses.RpcMetadata msg) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java
index 5c80816..d96293b 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/TypedValue.java
@@ -23,6 +23,7 @@ import org.apache.calcite.avatica.util.DateTimeUtils;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.protobuf.HBaseZeroCopyByteString;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -345,7 +346,7 @@ public class TypedValue {
       break;
     case BYTE_STRING:
     case STRING:
-      builder.setStringValue((String) value);
+      builder.setStringValueBytes(HBaseZeroCopyByteString.wrap(((String) value).getBytes()));
       break;
     case PRIMITIVE_CHAR:
     case CHARACTER:

http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/main/java/org/apache/calcite/avatica/util/UnsynchronizedBuffer.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/util/UnsynchronizedBuffer.java b/avatica/src/main/java/org/apache/calcite/avatica/util/UnsynchronizedBuffer.java
new file mode 100644
index 0000000..8daee60
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/util/UnsynchronizedBuffer.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A utility class for reading and writing bytes to byte buffers without synchronization. A
+ * reduced variant taken from Apache Accumulo. This class is <b>not</b> thread-safe by design.
+ * It is up to the caller to guarantee mutual exclusion as necessary.
+ */
+public class UnsynchronizedBuffer extends OutputStream {
+  // Anything larger than 64K, reap the backing buffer
+  private static final int LARGE_BUFFER_SIZE = 1024 * 64;
+
+  final int initialCapacity;
+  int offset = 0;
+  byte[] data;
+
+  /**
+   * Creates a new writer.
+   */
+  public UnsynchronizedBuffer() {
+    this(4096);
+  }
+
+  /**
+   * Creates a new writer.
+   *
+   * @param initialCapacity initial byte capacity
+   */
+  public UnsynchronizedBuffer(int initialCapacity) {
+    this.initialCapacity = initialCapacity;
+    data = new byte[initialCapacity];
+  }
+
+  private void reserve(int l) {
+    if (offset + l > data.length) {
+      int newSize = UnsynchronizedBuffer.nextArraySize(offset + l);
+
+      byte[] newData = new byte[newSize];
+      System.arraycopy(data, 0, newData, 0, offset);
+      data = newData;
+    }
+
+  }
+
+  /**
+   * Adds bytes to this writer's buffer.
+   *
+   * @param bytes byte array
+   * @param off offset into array to start copying bytes
+   * @param length number of bytes to add
+   * @throws IndexOutOfBoundsException if off or length are invalid
+   */
+  public void write(byte[] bytes, int off, int length) {
+    reserve(length);
+    System.arraycopy(bytes, off, data, offset, length);
+    offset += length;
+  }
+
+  @Override public void write(int b) throws IOException {
+    reserve(1);
+    data[offset] = (byte) b;
+    offset++;
+  }
+
+  /**
+   * Gets (a copy of) the contents of this writer's buffer.
+   *
+   * @return byte buffer contents
+   */
+  public byte[] toArray() {
+    byte[] ret = new byte[offset];
+    System.arraycopy(data, 0, ret, 0, offset);
+    return ret;
+  }
+
+  /**
+   * Resets the internal pointer into the buffer.
+   */
+  public void reset() {
+    offset = 0;
+    if (data.length >= LARGE_BUFFER_SIZE) {
+      data = new byte[this.initialCapacity];
+    }
+  }
+
+  /**
+   * @return The current offset into the backing array.
+   */
+  public int getOffset() {
+    return offset;
+  }
+
+  /**
+   * @return The current length of the backing array.
+   */
+  public long getSize() {
+    return data.length;
+  }
+
+  /**
+   * Determines what next array size should be by rounding up to next power of two.
+   *
+   * @param i current array size
+   * @return next array size
+   * @throws IllegalArgumentException if i is negative
+   */
+  public static int nextArraySize(int i) {
+    if (i < 0) {
+      throw new IllegalArgumentException();
+    }
+
+    if (i > (1 << 30)) {
+      return Integer.MAX_VALUE; // this is the next power of 2 minus one... a special case
+    }
+
+    if (i == 0) {
+      return 1;
+    }
+
+    // round up to next power of two
+    int ret = i;
+    ret--;
+    ret |= ret >> 1;
+    ret |= ret >> 2;
+    ret |= ret >> 4;
+    ret |= ret >> 8;
+    ret |= ret >> 16;
+    ret++;
+
+    return ret;
+  }
+}
+
+// End UnsynchronizedBuffer.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java
new file mode 100644
index 0000000..cd8a329
--- /dev/null
+++ b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ColumnMetaData.Rep;
+import org.apache.calcite.avatica.Meta.Signature;
+import org.apache.calcite.avatica.Meta.StatementHandle;
+import org.apache.calcite.avatica.proto.Common.WireMessage;
+import org.apache.calcite.avatica.proto.Requests;
+import org.apache.calcite.avatica.remote.Service.Request;
+
+import com.google.protobuf.HBaseZeroCopyByteString;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Protobuf serialization tests.
+ */
+public class ProtobufSerializationTest {
+
+  private Signature getSignature() {
+    return null;
+  }
+
+  private List<TypedValue> getTypedValues() {
+    List<TypedValue> paramValues =
+        Arrays.asList(TypedValue.create(Rep.BOOLEAN.name(), Boolean.TRUE),
+            TypedValue.create(Rep.STRING.name(), "string"));
+    return paramValues;
+  }
+
+  @Test public void testExecuteSerialization() throws Exception {
+    Service.ExecuteRequest executeRequest = new Service.ExecuteRequest(
+        new StatementHandle("connection", 12345, getSignature()), getTypedValues(), 0);
+
+    Requests.ExecuteRequest pbExecuteRequest = executeRequest.serialize();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+    pbExecuteRequest.writeTo(baos);
+
+    byte[] serialized = baos.toByteArray();
+    baos.reset();
+    WireMessage wireMsg = WireMessage.newBuilder().setName(Requests.ExecuteRequest.class.getName())
+        .setWrappedMessage(HBaseZeroCopyByteString.wrap(serialized)).build();
+    wireMsg.writeTo(baos);
+    serialized = baos.toByteArray();
+
+    ProtobufTranslation translator = new ProtobufTranslationImpl();
+
+    Request newRequest = translator.parseRequest(serialized);
+
+    Assert.assertEquals(executeRequest, newRequest);
+  }
+
+}
+
+// End ProtobufSerializationTest.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/1d3a26df/avatica/src/test/java/org/apache/calcite/avatica/util/UnsynchronizedBufferTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/util/UnsynchronizedBufferTest.java b/avatica/src/test/java/org/apache/calcite/avatica/util/UnsynchronizedBufferTest.java
new file mode 100644
index 0000000..a448d3e
--- /dev/null
+++ b/avatica/src/test/java/org/apache/calcite/avatica/util/UnsynchronizedBufferTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the UnsynchronizedBuffer.
+ */
+public class UnsynchronizedBufferTest {
+
+  @Test public void testArrayResizing() {
+    int size = 64;
+    int expected = 128;
+    for (int i = 0; i < 10; i++) {
+      // We keep being one byte short to contain this message
+      int next = UnsynchronizedBuffer.nextArraySize(size + 1);
+      assertEquals(expected, next);
+      size = next;
+      expected *= 2;
+    }
+  }
+}
+
+// End UnsynchronizedBufferTest.java