You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/09/17 02:10:49 UTC

[3/8] incubator-calcite git commit: [CALCITE-840] Protocol buffer serialization over HTTP for Avatica Server (Josh Elser)

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java
new file mode 100644
index 0000000..e4a1b3e
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.Meta;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A common base class for {@link Service} implementations that implement
+ * modifications made to response objects.
+ */
+public abstract class AbstractService implements Service {
+
+  /** Modifies a signature, changing the representation of numeric columns
+   * within it. This deals with the fact that JSON transmits a small long value,
+   * or a float which is a whole number, as an integer. Thus the accessors need
+   * be prepared to accept any numeric type. */
+  Meta.Signature finagle(Meta.Signature signature) {
+    final List<ColumnMetaData> columns = new ArrayList<>();
+    for (ColumnMetaData column : signature.columns) {
+      columns.add(finagle(column));
+    }
+    if (columns.equals(signature.columns)) {
+      return signature;
+    }
+    return new Meta.Signature(columns, signature.sql,
+        signature.parameters, signature.internalParameters,
+        signature.cursorFactory);
+  }
+
+  ColumnMetaData finagle(ColumnMetaData column) {
+    switch (column.type.rep) {
+    case BYTE:
+    case PRIMITIVE_BYTE:
+    case DOUBLE:
+    case PRIMITIVE_DOUBLE:
+    case FLOAT:
+    case PRIMITIVE_FLOAT:
+    case INTEGER:
+    case PRIMITIVE_INT:
+    case SHORT:
+    case PRIMITIVE_SHORT:
+    case LONG:
+    case PRIMITIVE_LONG:
+      return column.setRep(ColumnMetaData.Rep.NUMBER);
+    default:
+      // continue
+      break;
+    }
+    switch (column.type.id) {
+    case Types.VARBINARY:
+    case Types.BINARY:
+      return column.setRep(ColumnMetaData.Rep.STRING);
+    case Types.DECIMAL:
+    case Types.NUMERIC:
+      return column.setRep(ColumnMetaData.Rep.NUMBER);
+    default:
+      return column;
+    }
+  }
+
+  PrepareResponse finagle(PrepareResponse response) {
+    final Meta.StatementHandle statement = finagle(response.statement);
+    if (statement == response.statement) {
+      return response;
+    }
+    return new PrepareResponse(statement);
+  }
+
+  Meta.StatementHandle finagle(Meta.StatementHandle h) {
+    final Meta.Signature signature = finagle(h.signature);
+    if (signature == h.signature) {
+      return h;
+    }
+    return new Meta.StatementHandle(h.connectionId, h.id, signature);
+  }
+
+  ResultSetResponse finagle(ResultSetResponse r) {
+    if (r.updateCount != -1) {
+      assert r.signature == null;
+      return r;
+    }
+    final Meta.Signature signature = finagle(r.signature);
+    if (signature == r.signature) {
+      return r;
+    }
+    return new ResultSetResponse(r.connectionId, r.statementId, r.ownStatement,
+        signature, r.firstFrame, r.updateCount);
+  }
+
+  ExecuteResponse finagle(ExecuteResponse r) {
+    final List<ResultSetResponse> results = new ArrayList<>();
+    int changeCount = 0;
+    for (ResultSetResponse result : r.results) {
+      ResultSetResponse result2 = finagle(result);
+      if (result2 != result) {
+        ++changeCount;
+      }
+      results.add(result2);
+    }
+    if (changeCount == 0) {
+      return r;
+    }
+    return new ExecuteResponse(results);
+  }
+}
+
+// End AbstractService.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
index 16fcbc0..5e755f8 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
@@ -36,8 +36,7 @@ public enum AvaticaRemoteConnectionProperty implements ConnectionProperty {
   private final Type type;
   private final Object defaultValue;
 
-  private static final Map<String, AvaticaRemoteConnectionProperty>
-  NAME_TO_PROPS;
+  private static final Map<String, AvaticaRemoteConnectionProperty> NAME_TO_PROPS;
 
   static {
     NAME_TO_PROPS = new HashMap<String, AvaticaRemoteConnectionProperty>();

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java
index 9e9c623..7d8c058 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java
@@ -45,6 +45,14 @@ public class Driver extends UnregisteredDriver {
     super();
   }
 
+  /**
+   * Defines the method of message serialization used by the Driver
+   */
+  public static enum Serialization {
+    JSON,
+    PROTOBUF;
+  }
+
   @Override protected String getConnectStringPrefix() {
     return CONNECT_STRING_PREFIX;
   }
@@ -79,12 +87,39 @@ public class Driver extends UnregisteredDriver {
       } catch (MalformedURLException e) {
         throw new RuntimeException(e);
       }
-      service = new RemoteService(url);
+
+      Serialization serializationType = getSerialization(config);
+
+      switch (serializationType) {
+      case JSON:
+        service = new RemoteService(url);
+        break;
+      case PROTOBUF:
+        service = new RemoteProtobufService(url, new ProtobufTranslationImpl());
+        break;
+      default:
+        throw new IllegalArgumentException("Unhandled serialization type: " + serializationType);
+      }
     } else {
       service = new MockJsonService(Collections.<String, String>emptyMap());
     }
     return new RemoteMeta(connection, service);
   }
+
+  private Serialization getSerialization(ConnectionConfig config) {
+    final String serializationStr = config.serialization();
+    Serialization serializationType = Serialization.JSON;
+    if (null != serializationStr) {
+      try {
+        serializationType = Serialization.valueOf(serializationStr.toUpperCase());
+      } catch (Exception e) {
+        // Log a warning instead of failing harshly? Intentionally no loggers available?
+        throw new RuntimeException(e);
+      }
+    }
+
+    return serializationType;
+  }
 }
 
 // End Driver.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/Handler.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Handler.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Handler.java
index 94eb01e..a3f564d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Handler.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Handler.java
@@ -18,9 +18,11 @@ package org.apache.calcite.avatica.remote;
 
 /**
  * API for text request-response calls to an Avatica server.
+ *
+ * @param <T> The type this handler accepts and returns
  */
-public interface Handler {
-  String apply(String request);
+public interface Handler<T> {
+  T apply(T request);
 }
 
 // End Handler.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
index dee0636..f59218f 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
@@ -28,7 +28,7 @@ import java.io.StringWriter;
  *
  * @see org.apache.calcite.avatica.remote.JsonService
  */
-public class JsonHandler implements Handler {
+public class JsonHandler implements Handler<String> {
   private final Service service;
 
   protected static final ObjectMapper MAPPER = JsonService.MAPPER;

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
index 5c81b7b..dcb3d28 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
@@ -16,25 +16,19 @@
  */
 package org.apache.calcite.avatica.remote;
 
-import org.apache.calcite.avatica.ColumnMetaData;
-import org.apache.calcite.avatica.Meta;
-
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import java.io.IOException;
 import java.io.StringWriter;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * Implementation of {@link org.apache.calcite.avatica.remote.Service}
  * that encodes requests and responses as JSON.
  */
-public abstract class JsonService implements Service {
-  protected static final ObjectMapper MAPPER;
+public abstract class JsonService extends AbstractService {
+  public static final ObjectMapper MAPPER;
   static {
     MAPPER = new ObjectMapper();
     MAPPER.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
@@ -49,96 +43,6 @@ public abstract class JsonService implements Service {
    * responses to and from the peer service. */
   public abstract String apply(String request);
 
-  /** Modifies a signature, changing the representation of numeric columns
-   * within it. This deals with the fact that JSON transmits a small long value,
-   * or a float which is a whole number, as an integer. Thus the accessors need
-   * be prepared to accept any numeric type. */
-  private static Meta.Signature finagle(Meta.Signature signature) {
-    final List<ColumnMetaData> columns = new ArrayList<>();
-    for (ColumnMetaData column : signature.columns) {
-      columns.add(finagle(column));
-    }
-    if (columns.equals(signature.columns)) {
-      return signature;
-    }
-    return new Meta.Signature(columns, signature.sql,
-        signature.parameters, signature.internalParameters,
-        signature.cursorFactory);
-  }
-
-  private static ColumnMetaData finagle(ColumnMetaData column) {
-    switch (column.type.rep) {
-    case BYTE:
-    case PRIMITIVE_BYTE:
-    case DOUBLE:
-    case PRIMITIVE_DOUBLE:
-    case FLOAT:
-    case PRIMITIVE_FLOAT:
-    case INTEGER:
-    case PRIMITIVE_INT:
-    case SHORT:
-    case PRIMITIVE_SHORT:
-    case LONG:
-    case PRIMITIVE_LONG:
-      return column.setRep(ColumnMetaData.Rep.NUMBER);
-    }
-    switch (column.type.id) {
-    case Types.VARBINARY:
-    case Types.BINARY:
-      return column.setRep(ColumnMetaData.Rep.STRING);
-    case Types.DECIMAL:
-    case Types.NUMERIC:
-      return column.setRep(ColumnMetaData.Rep.NUMBER);
-    default:
-      return column;
-    }
-  }
-
-  private static PrepareResponse finagle(PrepareResponse response) {
-    final Meta.StatementHandle statement = finagle(response.statement);
-    if (statement == response.statement) {
-      return response;
-    }
-    return new PrepareResponse(statement);
-  }
-
-  private static Meta.StatementHandle finagle(Meta.StatementHandle h) {
-    final Meta.Signature signature = finagle(h.signature);
-    if (signature == h.signature) {
-      return h;
-    }
-    return new Meta.StatementHandle(h.connectionId, h.id, signature);
-  }
-
-  private static ResultSetResponse finagle(ResultSetResponse r) {
-    if (r.updateCount != -1) {
-      assert r.signature == null;
-      return r;
-    }
-    final Meta.Signature signature = finagle(r.signature);
-    if (signature == r.signature) {
-      return r;
-    }
-    return new ResultSetResponse(r.connectionId, r.statementId, r.ownStatement,
-        signature, r.firstFrame, r.updateCount);
-  }
-
-  private static ExecuteResponse finagle(ExecuteResponse r) {
-    final List<ResultSetResponse> results = new ArrayList<>();
-    int changeCount = 0;
-    for (ResultSetResponse result : r.results) {
-      ResultSetResponse result2 = finagle(result);
-      if (result2 != result) {
-        ++changeCount;
-      }
-      results.add(result2);
-    }
-    if (changeCount == 0) {
-      return r;
-    }
-    return new ExecuteResponse(results);
-  }
-
   //@VisibleForTesting
   protected static <T> T decode(String response, Class<T> valueType)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java
new file mode 100644
index 0000000..76e2392
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import java.io.IOException;
+
+/**
+ * A Service implementation that performs protocol buffer serialization on request and responses
+ * on either side of computing a response from a request to mimic some transport to a server which
+ * would normally perform such computation.
+ */
+public class LocalProtobufService extends ProtobufService {
+  private final Service service;
+  private final ProtobufTranslation translation;
+
+  public LocalProtobufService(Service service, ProtobufTranslation translation) {
+    this.service = service;
+    this.translation = translation;
+  }
+
+  @Override public Response _apply(Request request) {
+    try {
+      // Serialize the request to "send to the server"
+      byte[] serializedRequest = translation.serializeRequest(request);
+
+      // *some transport would normally happen here*
+
+      // Fake deserializing that request somewhere else
+      Request request2 = translation.parseRequest(serializedRequest);
+
+      // Serialize the response from the service to "send to the client"
+      byte[] serializedResponse = translation.serializeResponse(request2.accept(service));
+
+      // *some transport would normally happen here*
+
+      // Deserialize the response on "the client"
+      return translation.parseResponse(serializedResponse);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
+
+// End LocalProtobufService.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
new file mode 100644
index 0000000..5a455fc
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.MetaImpl;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A mock implementation of ProtobufService for testing.
+ *
+ * <p>It performs no serialization of requests and responses.
+ */
+public class MockProtobufService extends ProtobufService {
+
+  private static final Map<Request, Response> MAPPING;
+  static {
+    HashMap<Request, Response> mappings = new HashMap<>();
+
+    // Add in mappings
+
+    // Get the schema, no.. schema..?
+    mappings.put(
+        new SchemasRequest(null, null),
+        new ResultSetResponse(null, 1, true, null, Meta.Frame.EMPTY, -1));
+
+    // Get the tables, no tables exist
+    mappings.put(new TablesRequest(null, null, null, Collections.<String>emptyList()),
+        new ResultSetResponse(null, 150, true, null, Meta.Frame.EMPTY, -1));
+
+    // Create a statement, get back an id
+    mappings.put(new CreateStatementRequest("0"), new CreateStatementResponse("0", 1));
+
+    // Prepare and execute a query. Values and schema are returned
+    mappings.put(
+        new PrepareAndExecuteRequest("0", 1,
+            "select * from (\\n values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)", -1),
+        new ResultSetResponse("0", 1, true,
+            Meta.Signature.create(
+                Arrays.<ColumnMetaData>asList(
+                    MetaImpl.columnMetaData("C1", 0, Integer.class),
+                    MetaImpl.columnMetaData("C2", 1, String.class)),
+                null, null, Meta.CursorFactory.ARRAY),
+            Meta.Frame.create(0, true,
+                Arrays.<Object>asList(new Object[] {1, "a"},
+                    new Object[] {null, "b"}, new Object[] {3, "c"})), -1));
+
+    // Prepare a query. Schema for results are returned, but no values
+    mappings.put(
+        new PrepareRequest("0",
+            "select * from (\\n values(1, 'a'), (null, 'b'), (3, 'c')), as t (c1, c2)", -1),
+        new ResultSetResponse("0", 1, true,
+            Meta.Signature.create(
+                Arrays.<ColumnMetaData>asList(
+                    MetaImpl.columnMetaData("C1", 0, Integer.class),
+                    MetaImpl.columnMetaData("C2", 1, String.class)),
+                null, Collections.<AvaticaParameter>emptyList(),
+                Meta.CursorFactory.ARRAY),
+            null, -1));
+
+    MAPPING = Collections.unmodifiableMap(mappings);
+  }
+
+  @Override public Response _apply(Request request) {
+    if (request instanceof CloseConnectionRequest) {
+      return new CloseConnectionResponse();
+    }
+
+    return dispatch(request);
+  }
+
+  /**
+   * Fetches the static response for the given request.
+   *
+   * @param request the client's request
+   * @return the appropriate response
+   * @throws RuntimeException if no mapping is found for the request
+   */
+  private Response dispatch(Request request) {
+    Response response = MAPPING.get(request);
+
+    if (null == response) {
+      throw new RuntimeException("Had no response mapping for " + request);
+    }
+
+    return response;
+  }
+
+  /**
+   * A factory that instantiates the mock protobuf service.
+   */
+  public static class MockProtobufServiceFactory implements Service.Factory {
+    @Override public Service create(AvaticaConnection connection) {
+      return new MockProtobufService();
+    }
+  }
+}
+
+// End MockProtobufService.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java
new file mode 100644
index 0000000..52f0a03
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.remote.Service.Response;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import java.io.IOException;
+
+/**
+ * Dispatches serialized protocol buffer messages to the provided {@link Service}
+ * by converting them to the POJO Request. Returns back the serialized protocol
+ * buffer response.
+ */
+public class ProtobufHandler implements Handler<byte[]> {
+
+  private final Service service;
+  private final ProtobufTranslation translation;
+
+  public ProtobufHandler(Service service, ProtobufTranslation translation) {
+    this.service = service;
+    this.translation = translation;
+  }
+
+  @Override public byte[] apply(byte[] requestBytes) {
+    // Transform the protocol buffer bytes into a POJO
+    // Encapsulate the task of transforming this since
+    // the bytes also contain the PB request class name.
+    Service.Request requestPojo;
+    try {
+      requestPojo = translation.parseRequest(requestBytes);
+    } catch (InvalidProtocolBufferException e) {
+      throw new RuntimeException(e);
+    }
+
+    // Get the response for the request
+    Response response = requestPojo.accept(service);
+
+    try {
+      // Serialize it into bytes for the wire.
+      return translation.serializeResponse(response);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
+
+// End ProtobufHandler.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java
new file mode 100644
index 0000000..13d5860
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Message;
+
+/**
+ * Service implementation that encodes requests and responses as protocol buffers.
+ */
+public abstract class ProtobufService extends AbstractService {
+
+  /**
+   * Derived class should implement this method to transport requests and
+   * responses to and from the peer service.
+   */
+  public abstract Response _apply(Request request);
+
+  @Override public ResultSetResponse apply(CatalogsRequest request) {
+    return (ResultSetResponse) _apply(request);
+  }
+
+  @Override public ResultSetResponse apply(SchemasRequest request) {
+    return (ResultSetResponse) _apply(request);
+  }
+
+  @Override public ResultSetResponse apply(TablesRequest request) {
+    return (ResultSetResponse) _apply(request);
+  }
+
+  @Override public ResultSetResponse apply(TableTypesRequest request) {
+    return (ResultSetResponse) _apply(request);
+  }
+
+  @Override public ResultSetResponse apply(TypeInfoRequest request) {
+    return (ResultSetResponse) _apply(request);
+  }
+
+  @Override public ResultSetResponse apply(ColumnsRequest request) {
+    return (ResultSetResponse) _apply(request);
+  }
+
+  @Override public PrepareResponse apply(PrepareRequest request) {
+    return finagle((PrepareResponse) _apply(request));
+  }
+
+  @Override public ExecuteResponse apply(PrepareAndExecuteRequest request) {
+    return finagle((ExecuteResponse) _apply(request));
+  }
+
+  @Override public FetchResponse apply(FetchRequest request) {
+    return (FetchResponse) _apply(request);
+  }
+
+  @Override public CreateStatementResponse apply(CreateStatementRequest request) {
+    return (CreateStatementResponse) _apply(request);
+  }
+
+  @Override public CloseStatementResponse apply(CloseStatementRequest request) {
+    return (CloseStatementResponse) _apply(request);
+  }
+
+  @Override public CloseConnectionResponse apply(CloseConnectionRequest request) {
+    return (CloseConnectionResponse) _apply(request);
+  }
+
+  @Override public ConnectionSyncResponse apply(ConnectionSyncRequest request) {
+    return (ConnectionSyncResponse) _apply(request);
+  }
+
+  @Override public DatabasePropertyResponse apply(DatabasePropertyRequest request) {
+    return (DatabasePropertyResponse) _apply(request);
+  }
+
+  /**
+   * Determines whether the given message has the field, denoted by the provided number, set.
+   *
+   * @param msg The protobuf message
+   * @param desc The descriptor for the message
+   * @param fieldNum The identifier for the field
+   * @return True if the message contains the field, false otherwise
+   */
+  public static boolean hasField(Message msg, Descriptor desc, int fieldNum) {
+    return msg.hasField(desc.findFieldByNumber(fieldNum));
+  }
+}
+
+// End ProtobufService.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
new file mode 100644
index 0000000..acb82db
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.remote.Service.Request;
+import org.apache.calcite.avatica.remote.Service.Response;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import java.io.IOException;
+
+/**
+ * Generic interface to support parsing of serialized protocol buffers between client and server.
+ */
+public interface ProtobufTranslation {
+
+  /**
+   * Serializes a {@link Response} as a protocol buffer.
+   *
+   * @param response The response to serialize
+   * @throws IOException If there are errors during serialization
+   */
+  byte[] serializeResponse(Response response) throws IOException;
+
+  /**
+   * Serializes a {@link Request} as a protocol buffer.
+   *
+   * @param request The request to serialize
+   * @throws IOException If there are errors during serialization
+   */
+  byte[] serializeRequest(Request request) throws IOException;
+
+  /**
+   * Parses a serialized protocol buffer request into a {@link Request}.
+   *
+   * @param bytes Serialized protocol buffer request from client
+   * @return A Request object for the given bytes
+   * @throws InvalidProtocolBufferException If the protocol buffer cannot be deserialized
+   */
+  Request parseRequest(byte[] bytes) throws InvalidProtocolBufferException;
+
+  /**
+   * Parses a serialized protocol buffer response into a {@link Response}.
+   *
+   * @param bytes Serialized protocol buffer request from server
+   * @return The Response object for the given bytes
+   * @throws InvalidProtocolBufferException If the protocol buffer cannot be deserialized
+   */
+  Response parseResponse(byte[] bytes) throws InvalidProtocolBufferException;
+}
+
+// End ProtobufTranslation.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
new file mode 100644
index 0000000..5a4cc11
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.proto.Common.WireMessage;
+import org.apache.calcite.avatica.proto.Requests.CatalogsRequest;
+import org.apache.calcite.avatica.proto.Requests.CloseConnectionRequest;
+import org.apache.calcite.avatica.proto.Requests.CloseStatementRequest;
+import org.apache.calcite.avatica.proto.Requests.ColumnsRequest;
+import org.apache.calcite.avatica.proto.Requests.ConnectionSyncRequest;
+import org.apache.calcite.avatica.proto.Requests.CreateStatementRequest;
+import org.apache.calcite.avatica.proto.Requests.DatabasePropertyRequest;
+import org.apache.calcite.avatica.proto.Requests.FetchRequest;
+import org.apache.calcite.avatica.proto.Requests.PrepareAndExecuteRequest;
+import org.apache.calcite.avatica.proto.Requests.PrepareRequest;
+import org.apache.calcite.avatica.proto.Requests.SchemasRequest;
+import org.apache.calcite.avatica.proto.Requests.TableTypesRequest;
+import org.apache.calcite.avatica.proto.Requests.TablesRequest;
+import org.apache.calcite.avatica.proto.Requests.TypeInfoRequest;
+import org.apache.calcite.avatica.proto.Responses.CloseConnectionResponse;
+import org.apache.calcite.avatica.proto.Responses.CloseStatementResponse;
+import org.apache.calcite.avatica.proto.Responses.ConnectionSyncResponse;
+import org.apache.calcite.avatica.proto.Responses.CreateStatementResponse;
+import org.apache.calcite.avatica.proto.Responses.DatabasePropertyResponse;
+import org.apache.calcite.avatica.proto.Responses.ExecuteResponse;
+import org.apache.calcite.avatica.proto.Responses.FetchResponse;
+import org.apache.calcite.avatica.proto.Responses.PrepareResponse;
+import org.apache.calcite.avatica.proto.Responses.ResultSetResponse;
+import org.apache.calcite.avatica.remote.Service.Request;
+import org.apache.calcite.avatica.remote.Service.Response;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ProtobufTranslationImpl} that translates
+ * protobuf requests to POJO requests.
+ */
+public class ProtobufTranslationImpl implements ProtobufTranslation {
+
+  // Extremely ugly mapping of PB class name into a means to convert it to the POJO
+  private static final Map<String, RequestTranslator> REQUEST_PARSERS;
+  private static final Map<String, ResponseTranslator> RESPONSE_PARSERS;
+
+  static {
+    HashMap<String, RequestTranslator> reqParsers = new HashMap<>();
+    reqParsers.put(CatalogsRequest.class.getName(),
+        new RequestTranslator(CatalogsRequest.PARSER, new Service.CatalogsRequest()));
+    reqParsers.put(CloseConnectionRequest.class.getName(),
+        new RequestTranslator(CloseConnectionRequest.PARSER, new Service.CloseConnectionRequest()));
+    reqParsers.put(CloseStatementRequest.class.getName(),
+        new RequestTranslator(CloseStatementRequest.PARSER, new Service.CloseStatementRequest()));
+    reqParsers.put(ColumnsRequest.class.getName(),
+        new RequestTranslator(ColumnsRequest.PARSER, new Service.ColumnsRequest()));
+    reqParsers.put(ConnectionSyncRequest.class.getName(),
+        new RequestTranslator(ConnectionSyncRequest.PARSER, new Service.ConnectionSyncRequest()));
+    reqParsers.put(CreateStatementRequest.class.getName(),
+        new RequestTranslator(CreateStatementRequest.PARSER, new Service.CreateStatementRequest()));
+    reqParsers.put(DatabasePropertyRequest.class.getName(),
+        new RequestTranslator(DatabasePropertyRequest.PARSER,
+            new Service.DatabasePropertyRequest()));
+    reqParsers.put(FetchRequest.class.getName(),
+        new RequestTranslator(FetchRequest.PARSER, new Service.FetchRequest()));
+    reqParsers.put(PrepareAndExecuteRequest.class.getName(),
+        new RequestTranslator(PrepareAndExecuteRequest.PARSER,
+            new Service.PrepareAndExecuteRequest()));
+    reqParsers.put(PrepareRequest.class.getName(),
+        new RequestTranslator(PrepareRequest.PARSER, new Service.PrepareRequest()));
+    reqParsers.put(SchemasRequest.class.getName(),
+        new RequestTranslator(SchemasRequest.PARSER, new Service.SchemasRequest()));
+    reqParsers.put(TablesRequest.class.getName(),
+        new RequestTranslator(TablesRequest.PARSER, new Service.TablesRequest()));
+    reqParsers.put(TableTypesRequest.class.getName(),
+        new RequestTranslator(TableTypesRequest.PARSER, new Service.TableTypesRequest()));
+    reqParsers.put(TypeInfoRequest.class.getName(),
+        new RequestTranslator(TypeInfoRequest.PARSER, new Service.TypeInfoRequest()));
+
+    REQUEST_PARSERS = Collections.unmodifiableMap(reqParsers);
+
+    HashMap<String, ResponseTranslator> respParsers = new HashMap<>();
+    respParsers.put(CloseConnectionResponse.class.getName(),
+        new ResponseTranslator(CloseConnectionResponse.PARSER,
+            new Service.CloseConnectionResponse()));
+    respParsers.put(CloseStatementResponse.class.getName(),
+        new ResponseTranslator(CloseStatementResponse.PARSER,
+            new Service.CloseStatementResponse()));
+    respParsers.put(ConnectionSyncResponse.class.getName(),
+        new ResponseTranslator(ConnectionSyncResponse.PARSER,
+            new Service.ConnectionSyncResponse()));
+    respParsers.put(CreateStatementResponse.class.getName(),
+        new ResponseTranslator(CreateStatementResponse.PARSER,
+            new Service.CreateStatementResponse()));
+    respParsers.put(DatabasePropertyResponse.class.getName(),
+        new ResponseTranslator(DatabasePropertyResponse.PARSER,
+            new Service.DatabasePropertyResponse()));
+    respParsers.put(ExecuteResponse.class.getName(),
+        new ResponseTranslator(ExecuteResponse.PARSER, new Service.ExecuteResponse()));
+    respParsers.put(FetchResponse.class.getName(),
+        new ResponseTranslator(FetchResponse.PARSER, new Service.FetchResponse()));
+    respParsers.put(PrepareResponse.class.getName(),
+        new ResponseTranslator(PrepareResponse.PARSER, new Service.PrepareResponse()));
+    respParsers.put(ResultSetResponse.class.getName(),
+        new ResponseTranslator(ResultSetResponse.PARSER, new Service.ResultSetResponse()));
+
+    RESPONSE_PARSERS = Collections.unmodifiableMap(respParsers);
+  }
+
+  /**
+   * Fetches the concrete message's Parser implementation.
+   *
+   * @param className The protocol buffer class name
+   * @return The Parser for the class
+   * @throws IllegalArgumentException If the argument is null or if a Parser for the given
+   *     class name is not found.
+   */
+  public static RequestTranslator getParserForRequest(String className) {
+    if (null == className) {
+      throw new IllegalArgumentException("Cannot fetch parser for null class name");
+    }
+
+    RequestTranslator translator = REQUEST_PARSERS.get(className);
+    if (null == translator) {
+      throw new IllegalArgumentException("Cannot find parser for " + className);
+    }
+
+    return translator;
+  }
+
+  /**
+   * Fetches the concrete message's Parser implementation.
+   *
+   * @param className The protocol buffer class name
+   * @return The Parser for the class
+   * @throws IllegalArgumentException If the argument is null or if a Parser for the given
+   *     class name is not found.
+   */
+  public static ResponseTranslator getParserForResponse(String className) {
+    if (null == className) {
+      throw new IllegalArgumentException("Cannot fetch parser for null class name");
+    }
+
+    ResponseTranslator translator = RESPONSE_PARSERS.get(className);
+    if (null == translator) {
+      throw new IllegalArgumentException("Cannot find parser for " + className);
+    }
+
+    return translator;
+  }
+
+  @Override public byte[] serializeResponse(Response response) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    Message responseMsg = response.serialize();
+    serializeMessage(out, responseMsg);
+    return out.toByteArray();
+  }
+
+  @Override public byte[] serializeRequest(Request request) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    Message requestMsg = request.serialize();
+    serializeMessage(out, requestMsg);
+    return out.toByteArray();
+  }
+
+  void serializeMessage(OutputStream out, Message msg) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    msg.writeTo(baos);
+
+    // TODO Using ByteString is copying the bytes of the message which sucks. Could try to
+    // lift the ZeroCopy implementation from HBase.
+    WireMessage wireMsg = WireMessage.newBuilder().setName(msg.getClass().getName()).
+        setWrappedMessage(ByteString.copyFrom(baos.toByteArray())).build();
+
+    wireMsg.writeTo(out);
+  }
+
+  @Override public Request parseRequest(byte[] bytes) throws InvalidProtocolBufferException {
+    WireMessage wireMsg = WireMessage.parseFrom(bytes);
+
+    String serializedMessageClassName = wireMsg.getName();
+    RequestTranslator translator = getParserForRequest(serializedMessageClassName);
+
+    return translator.transform(wireMsg.getWrappedMessage());
+  }
+
+  @Override public Response parseResponse(byte[] bytes) throws InvalidProtocolBufferException {
+    WireMessage wireMsg = WireMessage.parseFrom(bytes);
+
+    String serializedMessageClassName = wireMsg.getName();
+    ResponseTranslator translator = getParserForResponse(serializedMessageClassName);
+
+    return translator.transform(wireMsg.getWrappedMessage());
+  }
+}
+
+// End ProtobufTranslationImpl.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index b5404dc..deacdee 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -20,7 +20,6 @@ import org.apache.calcite.avatica.AvaticaConnection;
 import org.apache.calcite.avatica.AvaticaParameter;
 import org.apache.calcite.avatica.ColumnMetaData;
 import org.apache.calcite.avatica.ConnectionPropertiesImpl;
-import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.MetaImpl;
 
 import java.sql.SQLException;
@@ -168,8 +167,7 @@ class RemoteMeta extends MetaImpl {
   @Override public ExecuteResult prepareAndExecute(StatementHandle h,
       String sql, long maxRowCount, PrepareCallback callback) {
     // sync connection state if necessary
-    connectionSync(new ConnectionHandle(h.connectionId),
-      new ConnectionPropertiesImpl());
+    connectionSync(new ConnectionHandle(h.connectionId), new ConnectionPropertiesImpl());
     final Service.ExecuteResponse response;
     try {
       synchronized (callback.getMonitor()) {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
new file mode 100644
index 0000000..cb1a468
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+/**
+ * ProtobufService implementation that queries against a remote implementation, using
+ * protocol buffers as the serialized form.
+ */
+public class RemoteProtobufService extends ProtobufService {
+  private final URL url;
+  private final ProtobufTranslation translation;
+
+  public RemoteProtobufService(URL url, ProtobufTranslation translation) {
+    this.url = url;
+    this.translation = translation;
+  }
+
+  @Override public Response _apply(Request request) {
+    try {
+      final HttpURLConnection connection =
+          (HttpURLConnection) url.openConnection();
+      connection.setRequestMethod("POST");
+      connection.setDoInput(true);
+      connection.setDoOutput(true);
+      try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream())) {
+        // Convert the Request to a protobuf and send it over the wire
+        wr.write(translation.serializeRequest(request));
+        wr.flush();
+        wr.close();
+      }
+      final int responseCode = connection.getResponseCode();
+      if (responseCode != HttpURLConnection.HTTP_OK) {
+        throw new RuntimeException("response code " + responseCode);
+      }
+      final InputStream inputStream = connection.getInputStream();
+      // Read the (serialized protobuf) response off the wire and convert it back to a Response
+      return translation.parseResponse(AvaticaUtils.readFullyToBytes(inputStream));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
+
+// End RemoteProtobufService.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
new file mode 100644
index 0000000..0dadc78
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+
+/**
+ * Encapsulate the logic of transforming a protobuf Request message into the Avatica POJO request.
+ */
+public class RequestTranslator {
+
+  private final Parser<? extends Message> parser;
+  private final Service.Request impl;
+
+  public RequestTranslator(Parser<? extends Message> parser, Service.Request impl) {
+    this.parser = parser;
+    this.impl = impl;
+  }
+
+  public Service.Request transform(ByteString serializedMessage) throws
+      InvalidProtocolBufferException {
+    Message msg = parser.parseFrom(serializedMessage);
+    return impl.deserialize(msg);
+  }
+}
+
+// End RequestTranslator.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java
new file mode 100644
index 0000000..0311e13
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+
+/**
+ * Encapsulate the logic of transforming a protobuf Response message into the Avatica POJO Response.
+ */
+public class ResponseTranslator {
+
+  private final Parser<? extends Message> parser;
+  private final Service.Response impl;
+
+  public ResponseTranslator(Parser<? extends Message> parser, Service.Response impl) {
+    this.parser = parser;
+    this.impl = impl;
+  }
+
+  public Service.Response transform(ByteString serializedMessage) throws
+      InvalidProtocolBufferException {
+    Message msg = parser.parseFrom(serializedMessage);
+    return impl.deserialize(msg);
+  }
+}
+
+// End ResponseTranslator.java