You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by el...@apache.org on 2016/03/07 19:28:14 UTC

[36/59] [partial] calcite git commit: [CALCITE-1078] Detach avatica from the core calcite Maven project

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
new file mode 100644
index 0000000..11a6104
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Mock implementation of {@link Service}
+ * that encodes its requests and responses as JSON
+ * and looks up responses from a pre-defined map.
+ */
+public class MockJsonService extends JsonService {
+  private final Map<String, String> map;
+
+  public MockJsonService(Map<String, String> map) {
+    this.map = map;
+  }
+
+  @Override public String apply(String request) {
+    String response = map.get(request);
+    if (response == null) {
+      throw new RuntimeException("No response for " + request);
+    }
+    return response;
+  }
+
+  /** Factory that creates a {@code MockJsonService}. */
+  public static class Factory implements Service.Factory {
+    public Service create(AvaticaConnection connection) {
+      final String connectionId = connection.id;
+      final Map<String, String> map1 = new HashMap<>();
+      try {
+        map1.put(
+            "{\"request\":\"openConnection\",\"connectionId\":\"" + connectionId + "\",\"info\":{}}",
+            "{\"response\":\"openConnection\"}");
+        map1.put(
+            "{\"request\":\"closeConnection\",\"connectionId\":\"" + connectionId + "\"}",
+            "{\"response\":\"closeConnection\"}");
+        map1.put(
+            "{\"request\":\"getSchemas\",\"catalog\":null,\"schemaPattern\":{\"s\":null}}",
+            "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}");
+        map1.put(
+            JsonService.encode(new SchemasRequest(connectionId, null, null)),
+            "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}");
+        map1.put(
+            JsonService.encode(
+                new TablesRequest(connectionId, null, null, null, Arrays.<String>asList())),
+            "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}");
+        map1.put(
+            "{\"request\":\"createStatement\",\"connectionId\":\"" + connectionId + "\"}",
+            "{\"response\":\"createStatement\",\"id\":0}");
+        map1.put(
+            "{\"request\":\"prepareAndExecute\",\"statementId\":0,"
+                + "\"sql\":\"select * from (\\n  values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)\",\"maxRowCount\":-1}",
+            "{\"response\":\"resultSet\", updateCount: -1, \"signature\": {\n"
+                + " \"columns\": [\n"
+                + "   {\"columnName\": \"C1\", \"type\": {type: \"scalar\", id: 4, rep: \"INTEGER\"}},\n"
+                + "   {\"columnName\": \"C2\", \"type\": {type: \"scalar\", id: 12, rep: \"STRING\"}}\n"
+                + " ], \"cursorFactory\": {\"style\": \"ARRAY\"}\n"
+                + "}, \"rows\": [[1, \"a\"], [null, \"b\"], [3, \"c\"]]}");
+        map1.put(
+            "{\"request\":\"prepare\",\"statementId\":0,"
+                + "\"sql\":\"select * from (\\n  values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)\",\"maxRowCount\":-1}",
+            "{\"response\":\"prepare\",\"signature\": {\n"
+                + " \"columns\": [\n"
+                + "   {\"columnName\": \"C1\", \"type\": {type: \"scalar\", id: 4, rep: \"INTEGER\"}},\n"
+                + "   {\"columnName\": \"C2\", \"type\": {type: \"scalar\", id: 12, rep: \"STRING\"}}\n"
+                + " ],\n"
+                + " \"parameters\": [],\n"
+                + " \"cursorFactory\": {\"style\": \"ARRAY\"}\n"
+                + "}}");
+        map1.put(
+            "{\"request\":\"getColumns\",\"connectionId\":\"" + connectionId + "\",\"catalog\":null,\"schemaPattern\":null,"
+                + "\"tableNamePattern\":\"my_table\",\"columnNamePattern\":null}",
+            "{\"response\":\"resultSet\",\"connectionId\":\"00000000-0000-0000-0000-000000000000\",\"statementId\":-1,\"ownStatement\":true,"
+                + "\"signature\":{\"columns\":["
+                  + "{\"ordinal\":0,\"autoIncrement\":false,\"caseSensitive\":false,\"searchable\":true,\"currency\":false,\"nullable\":1,\"signed\":false,"
+                    + "\"displaySize\":40,\"label\":\"TABLE_NAME\",\"columnName\":\"TABLE_NAME\",\"schemaName\":\"\",\"precision\":0,\"scale\":0,\"tableName\":\"SYSTEM.TABLE\","
+                    + "\"catalogName\":\"\",\"type\":{\"type\":\"scalar\",\"id\":12,\"name\":\"VARCHAR\",\"rep\":\"STRING\"},\"readOnly\":true,\"writable\":false,"
+                    + "\"definitelyWritable\":false,\"columnClassName\":\"java.lang.String\"},"
+                  + "{\"ordinal\":1,\"autoIncrement\":false,\"caseSensitive\":false,\"searchable\":true,\"currency\":false,\"nullable\":1,\"signed\":true,"
+                    + "\"displaySize\":40,\"label\":\"ORDINAL_POSITION\",\"columnName\":\"ORDINAL_POSITION\",\"schemaName\":\"\",\"precision\":0,\"scale\":0,"
+                    + "\"tableName\":\"SYSTEM.TABLE\",\"catalogName\":\"\",\"type\":{\"type\":\"scalar\",\"id\":-5,\"name\":\"BIGINT\",\"rep\":\"PRIMITIVE_LONG\"},"
+                    + "\"readOnly\":true,\"writable\":false,\"definitelyWritable\":false,\"columnClassName\":\"java.lang.Long\"}"
+                + "],\"sql\":null,"
+                + "\"parameters\":[],"
+                + "\"cursorFactory\":{\"style\":\"LIST\",\"clazz\":null,\"fieldNames\":null},\"statementType\":null},"
+                + "\"firstFrame\":{\"offset\":0,\"done\":true,"
+                + "\"rows\":[[\"my_table\",10]]"
+                + "},\"updateCount\":-1}");
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return new MockJsonService(map1);
+    }
+  }
+}
+
+// End MockJsonService.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
new file mode 100644
index 0000000..2fcb19a
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.MetaImpl;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A mock implementation of ProtobufService for testing.
+ *
+ * <p>It performs no serialization of requests and responses.
+ */
+public class MockProtobufService extends ProtobufService {
+
+  private final String connectionId;
+  private final Map<Request, Response> mapping;
+
+  public MockProtobufService(String connectionId) {
+    this.connectionId = connectionId;
+    this.mapping = createMapping();
+  }
+
+  private Map<Request, Response> createMapping() {
+    HashMap<Request, Response> mappings = new HashMap<>();
+
+    // Add in mappings
+
+    mappings.put(
+        new OpenConnectionRequest(connectionId, new HashMap<String, String>()),
+        new OpenConnectionResponse());
+
+    // Get the schema, no.. schema..?
+    mappings.put(
+        new SchemasRequest(connectionId, null, null),
+        // ownStatement=false just to avoid the extra close statement call.
+        new ResultSetResponse(null, 1, false, null, Meta.Frame.EMPTY, -1, null));
+
+    // Get the tables, no tables exist
+    mappings.put(new TablesRequest(connectionId, null, null, null, Collections.<String>emptyList()),
+        // ownStatement=false just to avoid the extra close statement call.
+        new ResultSetResponse(null, 150, false, null, Meta.Frame.EMPTY, -1, null));
+
+    // Create a statement, get back an id
+    mappings.put(new CreateStatementRequest("0"), new CreateStatementResponse("0", 1, null));
+
+    // Prepare and execute a query. Values and schema are returned
+    mappings.put(
+        new PrepareAndExecuteRequest(connectionId, 1,
+            "select * from (\\n values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)", -1),
+        new ResultSetResponse("0", 1, true,
+            Meta.Signature.create(
+                Arrays.<ColumnMetaData>asList(
+                    MetaImpl.columnMetaData("C1", 0, Integer.class),
+                    MetaImpl.columnMetaData("C2", 1, String.class)),
+                null, null, Meta.CursorFactory.ARRAY, Meta.StatementType.SELECT),
+            Meta.Frame.create(0, true,
+                Arrays.<Object>asList(new Object[] {1, "a"},
+                    new Object[] {null, "b"}, new Object[] {3, "c"})), -1, null));
+
+    // Prepare a query. Schema for results are returned, but no values
+    mappings.put(
+        new PrepareRequest(connectionId,
+            "select * from (\\n values(1, 'a'), (null, 'b'), (3, 'c')), as t (c1, c2)", -1),
+        new ResultSetResponse("0", 1, true,
+            Meta.Signature.create(
+                Arrays.<ColumnMetaData>asList(
+                    MetaImpl.columnMetaData("C1", 0, Integer.class),
+                    MetaImpl.columnMetaData("C2", 1, String.class)),
+                null, Collections.<AvaticaParameter>emptyList(),
+                Meta.CursorFactory.ARRAY, Meta.StatementType.SELECT),
+            null, -1, null));
+
+    mappings.put(
+        new ColumnsRequest(connectionId, null, null, "my_table", null),
+        new ResultSetResponse("00000000-0000-0000-0000-000000000000", -1, true,
+            Meta.Signature.create(
+                Arrays.<ColumnMetaData>asList(
+                    MetaImpl.columnMetaData("TABLE_NAME", 0, String.class),
+                    MetaImpl.columnMetaData("ORDINAL_POSITION", 1, Long.class)), null,
+                Collections.<AvaticaParameter>emptyList(), Meta.CursorFactory.ARRAY, null),
+            Meta.Frame.create(0, true,
+                Arrays.<Object>asList(new Object[] {new Object[]{"my_table", 10}})), -1, null));
+
+    return Collections.unmodifiableMap(mappings);
+  }
+
+  @Override public Response _apply(Request request) {
+    if (request instanceof CloseConnectionRequest) {
+      return new CloseConnectionResponse();
+    }
+
+    return dispatch(request);
+  }
+
+  /**
+   * Fetches the static response for the given request.
+   *
+   * @param request the client's request
+   * @return the appropriate response
+   * @throws RuntimeException if no mapping is found for the request
+   */
+  private Response dispatch(Request request) {
+    Response response = mapping.get(request);
+
+    if (null == response) {
+      throw new RuntimeException("Had no response mapping for " + request);
+    }
+
+    return response;
+  }
+
+  /**
+   * A factory that instantiates the mock protobuf service.
+   */
+  public static class MockProtobufServiceFactory implements Service.Factory {
+    @Override public Service create(AvaticaConnection connection) {
+      return new MockProtobufService(connection.id);
+    }
+  }
+}
+
+// End MockProtobufService.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java
new file mode 100644
index 0000000..89e380e
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+import org.apache.calcite.avatica.metrics.Timer;
+import org.apache.calcite.avatica.metrics.Timer.Context;
+import org.apache.calcite.avatica.remote.Service.Response;
+
+import java.io.IOException;
+
+/**
+ * Dispatches serialized protocol buffer messages to the provided {@link Service}
+ * by converting them to the POJO Request. Returns back the serialized protocol
+ * buffer response.
+ */
+public class ProtobufHandler extends AbstractHandler<byte[]> {
+
+  private final ProtobufTranslation translation;
+  private final MetricsSystem metrics;
+  private final Timer serializationTimer;
+
+  public ProtobufHandler(Service service, ProtobufTranslation translation, MetricsSystem metrics) {
+    super(service);
+    this.translation = translation;
+    this.metrics = metrics;
+    this.serializationTimer = this.metrics.getTimer(
+        MetricsHelper.concat(ProtobufHandler.class, HANDLER_SERIALIZATION_METRICS_NAME));
+  }
+
+  @Override public HandlerResponse<byte[]> apply(byte[] requestBytes) {
+    return super.apply(requestBytes);
+  }
+
+  @Override Service.Request decode(byte[] serializedRequest) throws IOException {
+    try (final Context ctx = serializationTimer.start()) {
+      return translation.parseRequest(serializedRequest);
+    }
+  }
+
+  @Override byte[] encode(Response response) throws IOException {
+    try (final Context ctx = serializationTimer.start()) {
+      return translation.serializeResponse(response);
+    }
+  }
+}
+
+// End ProtobufHandler.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java
new file mode 100644
index 0000000..56ba125
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import com.google.protobuf.Message;
+
+/**
+ * Service implementation that encodes requests and responses as protocol buffers.
+ */
+public abstract class ProtobufService extends AbstractService {
+
+  /**
+   * Derived class should implement this method to transport requests and
+   * responses to and from the peer service.
+   */
+  public abstract Response _apply(Request request);
+
+  @Override SerializationType getSerializationType() {
+    return SerializationType.PROTOBUF;
+  }
+
+  @Override public ResultSetResponse apply(CatalogsRequest request) {
+    return finagle((ResultSetResponse) _apply(request));
+  }
+
+  @Override public ResultSetResponse apply(SchemasRequest request) {
+    return finagle((ResultSetResponse) _apply(request));
+  }
+
+  @Override public ResultSetResponse apply(TablesRequest request) {
+    return finagle((ResultSetResponse) _apply(request));
+  }
+
+  @Override public ResultSetResponse apply(TableTypesRequest request) {
+    return finagle((ResultSetResponse) _apply(request));
+  }
+
+  @Override public ResultSetResponse apply(TypeInfoRequest request) {
+    return finagle((ResultSetResponse) _apply(request));
+  }
+
+  @Override public ResultSetResponse apply(ColumnsRequest request) {
+    return finagle((ResultSetResponse) _apply(request));
+  }
+
+  @Override public PrepareResponse apply(PrepareRequest request) {
+    return finagle((PrepareResponse) _apply(request));
+  }
+
+  @Override public ExecuteResponse apply(PrepareAndExecuteRequest request) {
+    return finagle((ExecuteResponse) _apply(request));
+  }
+
+  @Override public FetchResponse apply(FetchRequest request) {
+    return (FetchResponse) _apply(request);
+  }
+
+  @Override public CreateStatementResponse apply(CreateStatementRequest request) {
+    return (CreateStatementResponse) _apply(request);
+  }
+
+  @Override public CloseStatementResponse apply(CloseStatementRequest request) {
+    return (CloseStatementResponse) _apply(request);
+  }
+
+  @Override public OpenConnectionResponse apply(OpenConnectionRequest request) {
+    return (OpenConnectionResponse) _apply(request);
+  }
+
+  @Override public CloseConnectionResponse apply(CloseConnectionRequest request) {
+    return (CloseConnectionResponse) _apply(request);
+  }
+
+  @Override public ConnectionSyncResponse apply(ConnectionSyncRequest request) {
+    return (ConnectionSyncResponse) _apply(request);
+  }
+
+  @Override public DatabasePropertyResponse apply(DatabasePropertyRequest request) {
+    return (DatabasePropertyResponse) _apply(request);
+  }
+
+  @Override public ExecuteResponse apply(ExecuteRequest request) {
+    return finagle((ExecuteResponse) _apply(request));
+  }
+
+  @Override public SyncResultsResponse apply(SyncResultsRequest request) {
+    return (SyncResultsResponse) _apply(request);
+  }
+
+  @Override public CommitResponse apply(CommitRequest request) {
+    return (CommitResponse) _apply(request);
+  }
+
+  @Override public RollbackResponse apply(RollbackRequest request) {
+    return (RollbackResponse) _apply(request);
+  }
+
+  /**
+   * Checks if the provided {@link Message} is an instance of the Class given by
+   * <code>expectedType</code>. Throws an IllegalArgumentException if the message is not of the
+   * expected type, otherwise, it returns the message cast as the expected type.
+   *
+   * @param msg A Protocol Buffer message.
+   * @param expectedType The expected type of the Protocol Buffer message.
+   * @return The msg cast to the concrete Message type.
+   * @throws IllegalArgumentException If the type of the message is not the expectedType.
+   */
+  public static <T extends Message> T castProtobufMessage(Message msg, Class<T> expectedType) {
+    if (!expectedType.isInstance(msg)) {
+      throw new IllegalArgumentException("Expected instance of " + expectedType.getName()
+          + ", but got " + msg.getClass().getName());
+    }
+
+    return expectedType.cast(msg);
+  }
+}
+
+// End ProtobufService.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
new file mode 100644
index 0000000..7142d59
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.remote.Service.Request;
+import org.apache.calcite.avatica.remote.Service.Response;
+
+import java.io.IOException;
+
+/**
+ * Generic interface to support parsing of serialized protocol buffers between client and server.
+ */
+public interface ProtobufTranslation {
+
+  /**
+   * Serializes a {@link Response} as a protocol buffer.
+   *
+   * @param response The response to serialize
+   * @throws IOException If there are errors during serialization
+   */
+  byte[] serializeResponse(Response response) throws IOException;
+
+  /**
+   * Serializes a {@link Request} as a protocol buffer.
+   *
+   * @param request The request to serialize
+   * @throws IOException If there are errors during serialization
+   */
+  byte[] serializeRequest(Request request) throws IOException;
+
+  /**
+   * Parses a serialized protocol buffer request into a {@link Request}.
+   *
+   * @param bytes Serialized protocol buffer request from client
+   * @return A Request object for the given bytes
+   * @throws IOException If the protocol buffer cannot be deserialized
+   */
+  Request parseRequest(byte[] bytes) throws IOException;
+
+  /**
+   * Parses a serialized protocol buffer response into a {@link Response}.
+   *
+   * @param bytes Serialized protocol buffer request from server
+   * @return The Response object for the given bytes
+   * @throws IOException If the protocol buffer cannot be deserialized
+   */
+  Response parseResponse(byte[] bytes) throws IOException;
+}
+
+// End ProtobufTranslation.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
new file mode 100644
index 0000000..80d2b22
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.proto.Common.WireMessage;
+import org.apache.calcite.avatica.proto.Requests.CatalogsRequest;
+import org.apache.calcite.avatica.proto.Requests.CloseConnectionRequest;
+import org.apache.calcite.avatica.proto.Requests.CloseStatementRequest;
+import org.apache.calcite.avatica.proto.Requests.ColumnsRequest;
+import org.apache.calcite.avatica.proto.Requests.CommitRequest;
+import org.apache.calcite.avatica.proto.Requests.ConnectionSyncRequest;
+import org.apache.calcite.avatica.proto.Requests.CreateStatementRequest;
+import org.apache.calcite.avatica.proto.Requests.DatabasePropertyRequest;
+import org.apache.calcite.avatica.proto.Requests.ExecuteRequest;
+import org.apache.calcite.avatica.proto.Requests.FetchRequest;
+import org.apache.calcite.avatica.proto.Requests.OpenConnectionRequest;
+import org.apache.calcite.avatica.proto.Requests.PrepareAndExecuteRequest;
+import org.apache.calcite.avatica.proto.Requests.PrepareRequest;
+import org.apache.calcite.avatica.proto.Requests.RollbackRequest;
+import org.apache.calcite.avatica.proto.Requests.SchemasRequest;
+import org.apache.calcite.avatica.proto.Requests.SyncResultsRequest;
+import org.apache.calcite.avatica.proto.Requests.TableTypesRequest;
+import org.apache.calcite.avatica.proto.Requests.TablesRequest;
+import org.apache.calcite.avatica.proto.Requests.TypeInfoRequest;
+import org.apache.calcite.avatica.proto.Responses.CloseConnectionResponse;
+import org.apache.calcite.avatica.proto.Responses.CloseStatementResponse;
+import org.apache.calcite.avatica.proto.Responses.CommitResponse;
+import org.apache.calcite.avatica.proto.Responses.ConnectionSyncResponse;
+import org.apache.calcite.avatica.proto.Responses.CreateStatementResponse;
+import org.apache.calcite.avatica.proto.Responses.DatabasePropertyResponse;
+import org.apache.calcite.avatica.proto.Responses.ErrorResponse;
+import org.apache.calcite.avatica.proto.Responses.ExecuteResponse;
+import org.apache.calcite.avatica.proto.Responses.FetchResponse;
+import org.apache.calcite.avatica.proto.Responses.OpenConnectionResponse;
+import org.apache.calcite.avatica.proto.Responses.PrepareResponse;
+import org.apache.calcite.avatica.proto.Responses.ResultSetResponse;
+import org.apache.calcite.avatica.proto.Responses.RollbackResponse;
+import org.apache.calcite.avatica.proto.Responses.RpcMetadata;
+import org.apache.calcite.avatica.proto.Responses.SyncResultsResponse;
+import org.apache.calcite.avatica.remote.Service.Request;
+import org.apache.calcite.avatica.remote.Service.Response;
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.HBaseZeroCopyByteString;
+import com.google.protobuf.Message;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Implementation of {@link ProtobufTranslationImpl} that translates
+ * protobuf requests to POJO requests.
+ */
+public class ProtobufTranslationImpl implements ProtobufTranslation {
+
+  // Extremely ugly mapping of PB class name into a means to convert it to the POJO
+  private static final Map<String, RequestTranslator> REQUEST_PARSERS;
+  private static final Map<String, ResponseTranslator> RESPONSE_PARSERS;
+  private static final Map<Class<?>, ByteString> MESSAGE_CLASSES;
+
+  static {
+    Map<String, RequestTranslator> reqParsers = new ConcurrentHashMap<>();
+    reqParsers.put(CatalogsRequest.class.getName(),
+        new RequestTranslator(CatalogsRequest.parser(), new Service.CatalogsRequest()));
+    reqParsers.put(OpenConnectionRequest.class.getName(),
+        new RequestTranslator(OpenConnectionRequest.parser(), new Service.OpenConnectionRequest()));
+    reqParsers.put(CloseConnectionRequest.class.getName(),
+        new RequestTranslator(CloseConnectionRequest.parser(),
+          new Service.CloseConnectionRequest()));
+    reqParsers.put(CloseStatementRequest.class.getName(),
+        new RequestTranslator(CloseStatementRequest.parser(), new Service.CloseStatementRequest()));
+    reqParsers.put(ColumnsRequest.class.getName(),
+        new RequestTranslator(ColumnsRequest.parser(), new Service.ColumnsRequest()));
+    reqParsers.put(ConnectionSyncRequest.class.getName(),
+        new RequestTranslator(ConnectionSyncRequest.parser(), new Service.ConnectionSyncRequest()));
+    reqParsers.put(CreateStatementRequest.class.getName(),
+        new RequestTranslator(CreateStatementRequest.parser(),
+          new Service.CreateStatementRequest()));
+    reqParsers.put(DatabasePropertyRequest.class.getName(),
+        new RequestTranslator(DatabasePropertyRequest.parser(),
+            new Service.DatabasePropertyRequest()));
+    reqParsers.put(FetchRequest.class.getName(),
+        new RequestTranslator(FetchRequest.parser(), new Service.FetchRequest()));
+    reqParsers.put(PrepareAndExecuteRequest.class.getName(),
+        new RequestTranslator(PrepareAndExecuteRequest.parser(),
+            new Service.PrepareAndExecuteRequest()));
+    reqParsers.put(PrepareRequest.class.getName(),
+        new RequestTranslator(PrepareRequest.parser(), new Service.PrepareRequest()));
+    reqParsers.put(SchemasRequest.class.getName(),
+        new RequestTranslator(SchemasRequest.parser(), new Service.SchemasRequest()));
+    reqParsers.put(TablesRequest.class.getName(),
+        new RequestTranslator(TablesRequest.parser(), new Service.TablesRequest()));
+    reqParsers.put(TableTypesRequest.class.getName(),
+        new RequestTranslator(TableTypesRequest.parser(), new Service.TableTypesRequest()));
+    reqParsers.put(TypeInfoRequest.class.getName(),
+        new RequestTranslator(TypeInfoRequest.parser(), new Service.TypeInfoRequest()));
+    reqParsers.put(ExecuteRequest.class.getName(),
+        new RequestTranslator(ExecuteRequest.parser(), new Service.ExecuteRequest()));
+    reqParsers.put(SyncResultsRequest.class.getName(),
+        new RequestTranslator(SyncResultsRequest.parser(), new Service.SyncResultsRequest()));
+    reqParsers.put(CommitRequest.class.getName(),
+        new RequestTranslator(CommitRequest.parser(), new Service.CommitRequest()));
+    reqParsers.put(RollbackRequest.class.getName(),
+        new RequestTranslator(RollbackRequest.parser(), new Service.RollbackRequest()));
+
+    REQUEST_PARSERS = Collections.unmodifiableMap(reqParsers);
+
+    Map<String, ResponseTranslator> respParsers = new ConcurrentHashMap<>();
+    respParsers.put(OpenConnectionResponse.class.getName(),
+        new ResponseTranslator(OpenConnectionResponse.parser(),
+            new Service.OpenConnectionResponse()));
+    respParsers.put(CloseConnectionResponse.class.getName(),
+        new ResponseTranslator(CloseConnectionResponse.parser(),
+            new Service.CloseConnectionResponse()));
+    respParsers.put(CloseStatementResponse.class.getName(),
+        new ResponseTranslator(CloseStatementResponse.parser(),
+            new Service.CloseStatementResponse()));
+    respParsers.put(ConnectionSyncResponse.class.getName(),
+        new ResponseTranslator(ConnectionSyncResponse.parser(),
+            new Service.ConnectionSyncResponse()));
+    respParsers.put(CreateStatementResponse.class.getName(),
+        new ResponseTranslator(CreateStatementResponse.parser(),
+            new Service.CreateStatementResponse()));
+    respParsers.put(DatabasePropertyResponse.class.getName(),
+        new ResponseTranslator(DatabasePropertyResponse.parser(),
+            new Service.DatabasePropertyResponse()));
+    respParsers.put(ExecuteResponse.class.getName(),
+        new ResponseTranslator(ExecuteResponse.parser(), new Service.ExecuteResponse()));
+    respParsers.put(FetchResponse.class.getName(),
+        new ResponseTranslator(FetchResponse.parser(), new Service.FetchResponse()));
+    respParsers.put(PrepareResponse.class.getName(),
+        new ResponseTranslator(PrepareResponse.parser(), new Service.PrepareResponse()));
+    respParsers.put(ResultSetResponse.class.getName(),
+        new ResponseTranslator(ResultSetResponse.parser(), new Service.ResultSetResponse()));
+    respParsers.put(ErrorResponse.class.getName(),
+        new ResponseTranslator(ErrorResponse.parser(), new Service.ErrorResponse()));
+    respParsers.put(SyncResultsResponse.class.getName(),
+        new ResponseTranslator(SyncResultsResponse.parser(), new Service.SyncResultsResponse()));
+    respParsers.put(RpcMetadata.class.getName(),
+        new ResponseTranslator(RpcMetadata.parser(), new RpcMetadataResponse()));
+    respParsers.put(CommitResponse.class.getName(),
+        new ResponseTranslator(CommitResponse.parser(), new Service.CommitResponse()));
+    respParsers.put(RollbackResponse.class.getName(),
+        new ResponseTranslator(RollbackResponse.parser(), new Service.RollbackResponse()));
+
+    RESPONSE_PARSERS = Collections.unmodifiableMap(respParsers);
+
+    Map<Class<?>, ByteString> messageClassNames = new ConcurrentHashMap<>();
+    for (Class<?> msgClz : getAllMessageClasses()) {
+      messageClassNames.put(msgClz, wrapClassName(msgClz));
+    }
+    MESSAGE_CLASSES = Collections.unmodifiableMap(messageClassNames);
+  }
+
+  private static List<Class<?>> getAllMessageClasses() {
+    List<Class<?>> messageClasses = new ArrayList<>();
+    messageClasses.add(CatalogsRequest.class);
+    messageClasses.add(CloseConnectionRequest.class);
+    messageClasses.add(CloseStatementRequest.class);
+    messageClasses.add(ColumnsRequest.class);
+    messageClasses.add(CommitRequest.class);
+    messageClasses.add(ConnectionSyncRequest.class);
+    messageClasses.add(CreateStatementRequest.class);
+    messageClasses.add(DatabasePropertyRequest.class);
+    messageClasses.add(ExecuteRequest.class);
+    messageClasses.add(FetchRequest.class);
+    messageClasses.add(OpenConnectionRequest.class);
+    messageClasses.add(PrepareAndExecuteRequest.class);
+    messageClasses.add(PrepareRequest.class);
+    messageClasses.add(RollbackRequest.class);
+    messageClasses.add(SchemasRequest.class);
+    messageClasses.add(SyncResultsRequest.class);
+    messageClasses.add(TableTypesRequest.class);
+    messageClasses.add(TablesRequest.class);
+    messageClasses.add(TypeInfoRequest.class);
+    messageClasses.add(CloseConnectionResponse.class);
+    messageClasses.add(CloseStatementResponse.class);
+    messageClasses.add(CommitResponse.class);
+    messageClasses.add(ConnectionSyncResponse.class);
+    messageClasses.add(CreateStatementResponse.class);
+    messageClasses.add(DatabasePropertyResponse.class);
+    messageClasses.add(ErrorResponse.class);
+    messageClasses.add(ExecuteResponse.class);
+    messageClasses.add(FetchResponse.class);
+    messageClasses.add(OpenConnectionResponse.class);
+    messageClasses.add(PrepareResponse.class);
+    messageClasses.add(ResultSetResponse.class);
+    messageClasses.add(RollbackResponse.class);
+    messageClasses.add(RpcMetadata.class);
+    messageClasses.add(SyncResultsResponse.class);
+
+    return messageClasses;
+  }
+
+  private static ByteString wrapClassName(Class<?> clz) {
+    return HBaseZeroCopyByteString.wrap(clz.getName().getBytes(UTF_8));
+  }
+
+  private final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer =
+      new ThreadLocal<UnsynchronizedBuffer>() {
+        @Override protected UnsynchronizedBuffer initialValue() {
+          return new UnsynchronizedBuffer();
+        }
+      };
+
+  /**
+   * Fetches the concrete message's Parser implementation.
+   *
+   * @param className The protocol buffer class name
+   * @return The Parser for the class
+   * @throws IllegalArgumentException If the argument is null or if a Parser for the given
+   *     class name is not found.
+   */
+  public static RequestTranslator getParserForRequest(String className) {
+    if (null == className) {
+      throw new IllegalArgumentException("Cannot fetch parser for null class name");
+    }
+
+    RequestTranslator translator = REQUEST_PARSERS.get(className);
+    if (null == translator) {
+      throw new IllegalArgumentException("Cannot find parser for " + className);
+    }
+
+    return translator;
+  }
+
+  /**
+   * Fetches the concrete message's Parser implementation.
+   *
+   * @param className The protocol buffer class name
+   * @return The Parser for the class
+   * @throws IllegalArgumentException If the argument is null or if a Parser for the given
+   *     class name is not found.
+   */
+  public static ResponseTranslator getParserForResponse(String className) {
+    if (null == className) {
+      throw new IllegalArgumentException("Cannot fetch parser for null class name");
+    }
+
+    ResponseTranslator translator = RESPONSE_PARSERS.get(className);
+    if (null == translator) {
+      throw new IllegalArgumentException("Cannot find parser for " + className);
+    }
+
+    return translator;
+  }
+
+  @Override public byte[] serializeResponse(Response response) throws IOException {
+    // Avoid BAOS for its synchronized write methods, we don't need that concurrency control
+    UnsynchronizedBuffer out = threadLocalBuffer.get();
+    try {
+      Message responseMsg = response.serialize();
+      serializeMessage(out, responseMsg);
+      return out.toArray();
+    } finally {
+      out.reset();
+    }
+  }
+
+  @Override public byte[] serializeRequest(Request request) throws IOException {
+    // Avoid BAOS for its synchronized write methods, we don't need that concurrency control
+    UnsynchronizedBuffer out = threadLocalBuffer.get();
+    try {
+      Message requestMsg = request.serialize();
+      serializeMessage(out, requestMsg);
+      return out.toArray();
+    } finally {
+      out.reset();
+    }
+  }
+
+  void serializeMessage(OutputStream out, Message msg) throws IOException {
+    // Serialize the protobuf message
+    UnsynchronizedBuffer buffer = threadLocalBuffer.get();
+    ByteString serializedMsg;
+    try {
+      msg.writeTo(buffer);
+      // Make a bytestring from it
+      serializedMsg = HBaseZeroCopyByteString.wrap(buffer.toArray());
+    } finally {
+      buffer.reset();
+    }
+
+    // Wrap the serialized message in a WireMessage
+    WireMessage wireMsg = WireMessage.newBuilder().setNameBytes(getClassNameBytes(msg.getClass()))
+        .setWrappedMessage(serializedMsg).build();
+
+    // Write the WireMessage to the provided OutputStream
+    wireMsg.writeTo(out);
+  }
+
+  ByteString getClassNameBytes(Class<?> clz) {
+    ByteString byteString = MESSAGE_CLASSES.get(clz);
+    if (null == byteString) {
+      throw new IllegalArgumentException("Missing ByteString for " + clz.getName());
+    }
+    return byteString;
+  }
+
+  @Override public Request parseRequest(byte[] bytes) throws IOException {
+    ByteString byteString = HBaseZeroCopyByteString.wrap(bytes);
+    CodedInputStream inputStream = byteString.newCodedInput();
+    // Enable aliasing to avoid an extra copy to get at the serialized Request inside of the
+    // WireMessage.
+    inputStream.enableAliasing(true);
+    WireMessage wireMsg = WireMessage.parseFrom(inputStream);
+
+    String serializedMessageClassName = wireMsg.getName();
+    RequestTranslator translator = getParserForRequest(serializedMessageClassName);
+
+    // The ByteString should be logical offsets into the original byte array
+    return translator.transform(wireMsg.getWrappedMessage());
+  }
+
+  @Override public Response parseResponse(byte[] bytes) throws IOException {
+    ByteString byteString = HBaseZeroCopyByteString.wrap(bytes);
+    CodedInputStream inputStream = byteString.newCodedInput();
+    // Enable aliasing to avoid an extra copy to get at the serialized Response inside of the
+    // WireMessage.
+    inputStream.enableAliasing(true);
+    WireMessage wireMsg = WireMessage.parseFrom(inputStream);
+
+    String serializedMessageClassName = wireMsg.getName();
+    ResponseTranslator translator = getParserForResponse(serializedMessageClassName);
+
+    return translator.transform(wireMsg.getWrappedMessage());
+  }
+}
+
+// End ProtobufTranslationImpl.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
new file mode 100644
index 0000000..463985a
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaConnection.CallableWithoutException;
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.ConnectionPropertiesImpl;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.MissingResultsException;
+import org.apache.calcite.avatica.NoSuchStatementException;
+import org.apache.calcite.avatica.QueryState;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link org.apache.calcite.avatica.Meta} for the remote
+ * driver.
+ */
+class RemoteMeta extends MetaImpl {
+  final Service service;
+  final Map<String, ConnectionPropertiesImpl> propsMap = new HashMap<>();
+  private Map<DatabaseProperty, Object> databaseProperties;
+
+  public RemoteMeta(AvaticaConnection connection, Service service) {
+    super(connection);
+    this.service = service;
+  }
+
+  private MetaResultSet toResultSet(Class clazz,
+      Service.ResultSetResponse response) {
+    if (response.updateCount != -1) {
+      return MetaResultSet.count(response.connectionId, response.statementId,
+          response.updateCount);
+    }
+    Signature signature0 = response.signature;
+    if (signature0 == null) {
+      final List<ColumnMetaData> columns =
+          clazz == null
+              ? Collections.<ColumnMetaData>emptyList()
+              : fieldMetaData(clazz).columns;
+      signature0 = Signature.create(columns,
+          "?", Collections.<AvaticaParameter>emptyList(), CursorFactory.ARRAY,
+          Meta.StatementType.SELECT);
+    }
+    return MetaResultSet.create(response.connectionId, response.statementId,
+        response.ownStatement, signature0, response.firstFrame);
+  }
+
+  @Override public Map<DatabaseProperty, Object> getDatabaseProperties(ConnectionHandle ch) {
+    synchronized (this) {
+      // Compute map on first use, and cache
+      if (databaseProperties == null) {
+        databaseProperties =
+            service.apply(new Service.DatabasePropertyRequest(ch.id)).map;
+      }
+      return databaseProperties;
+    }
+  }
+
+  @Override public StatementHandle createStatement(final ConnectionHandle ch) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<StatementHandle>() {
+          public StatementHandle call() {
+            // sync connection state if necessary
+            connectionSync(ch, new ConnectionPropertiesImpl());
+            final Service.CreateStatementResponse response =
+                service.apply(new Service.CreateStatementRequest(ch.id));
+            return new StatementHandle(response.connectionId, response.statementId, null);
+          }
+        });
+  }
+
+  @Override public void closeStatement(final StatementHandle h) {
+    connection.invokeWithRetries(
+        new CallableWithoutException<Void>() {
+          public Void call() {
+            final Service.CloseStatementResponse response =
+                service.apply(
+                    new Service.CloseStatementRequest(h.connectionId, h.id));
+            return null;
+          }
+        });
+  }
+
+  @Override public void openConnection(final ConnectionHandle ch, final Map<String, String> info) {
+    connection.invokeWithRetries(
+        new CallableWithoutException<Void>() {
+          public Void call() {
+            final Service.OpenConnectionResponse response =
+                service.apply(new Service.OpenConnectionRequest(ch.id, info));
+            return null;
+          }
+        });
+  }
+
+  @Override public void closeConnection(final ConnectionHandle ch) {
+    connection.invokeWithRetries(
+        new CallableWithoutException<Void>() {
+          public Void call() {
+            final Service.CloseConnectionResponse response =
+                service.apply(new Service.CloseConnectionRequest(ch.id));
+            propsMap.remove(ch.id);
+            return null;
+          }
+        });
+  }
+
+  @Override public ConnectionProperties connectionSync(final ConnectionHandle ch,
+      final ConnectionProperties connProps) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<ConnectionProperties>() {
+          public ConnectionProperties call() {
+            ConnectionPropertiesImpl localProps = propsMap.get(ch.id);
+            if (localProps == null) {
+              localProps = new ConnectionPropertiesImpl();
+              localProps.setDirty(true);
+              propsMap.put(ch.id, localProps);
+            }
+
+            // Only make an RPC if necessary. RPC is necessary when we have local changes that need
+            // flushed to the server (be sure to introduce any new changes from connProps before
+            // checking AND when connProps.isEmpty() (meaning, this was a request for a value, not
+            // overriding a value). Otherwise, accumulate the change locally and return immediately.
+            if (localProps.merge(connProps).isDirty() && connProps.isEmpty()) {
+              final Service.ConnectionSyncResponse response = service.apply(
+                  new Service.ConnectionSyncRequest(ch.id, localProps));
+              propsMap.put(ch.id, (ConnectionPropertiesImpl) response.connProps);
+              return response.connProps;
+            } else {
+              return localProps;
+            }
+          }
+        });
+  }
+
+  @Override public MetaResultSet getCatalogs(final ConnectionHandle ch) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<MetaResultSet>() {
+          public MetaResultSet call() {
+            final Service.ResultSetResponse response =
+                service.apply(new Service.CatalogsRequest(ch.id));
+            return toResultSet(MetaCatalog.class, response);
+          }
+        });
+  }
+
+  @Override public MetaResultSet getSchemas(final ConnectionHandle ch, final String catalog,
+      final Pat schemaPattern) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<MetaResultSet>() {
+          public MetaResultSet call() {
+            final Service.ResultSetResponse response =
+                service.apply(
+                    new Service.SchemasRequest(ch.id, catalog, schemaPattern.s));
+            return toResultSet(MetaSchema.class, response);
+          }
+        });
+  }
+
+  @Override public MetaResultSet getTables(final ConnectionHandle ch, final String catalog,
+      final Pat schemaPattern, final Pat tableNamePattern, final List<String> typeList) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<MetaResultSet>() {
+          public MetaResultSet call() {
+            final Service.ResultSetResponse response =
+                service.apply(
+                    new Service.TablesRequest(ch.id, catalog, schemaPattern.s,
+                        tableNamePattern.s, typeList));
+            return toResultSet(MetaTable.class, response);
+          }
+        });
+  }
+
+  @Override public MetaResultSet getTableTypes(final ConnectionHandle ch) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<MetaResultSet>() {
+          public MetaResultSet call() {
+            final Service.ResultSetResponse response =
+                service.apply(new Service.TableTypesRequest(ch.id));
+            return toResultSet(MetaTableType.class, response);
+          }
+        });
+  }
+
+  @Override public MetaResultSet getTypeInfo(final ConnectionHandle ch) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<MetaResultSet>() {
+          public MetaResultSet call() {
+            final Service.ResultSetResponse response =
+                service.apply(new Service.TypeInfoRequest(ch.id));
+            return toResultSet(MetaTypeInfo.class, response);
+          }
+        });
+  }
+
+  @Override public MetaResultSet getColumns(final ConnectionHandle ch, final String catalog,
+      final Pat schemaPattern, final Pat tableNamePattern, final Pat columnNamePattern) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<MetaResultSet>() {
+          public MetaResultSet call() {
+            final Service.ResultSetResponse response =
+                service.apply(
+                    new Service.ColumnsRequest(ch.id, catalog, schemaPattern.s,
+                        tableNamePattern.s, columnNamePattern.s));
+            return toResultSet(MetaColumn.class, response);
+          }
+        });
+  }
+
+  @Override public StatementHandle prepare(final ConnectionHandle ch, final String sql,
+      final long maxRowCount) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<StatementHandle>() {
+          public StatementHandle call() {
+            connectionSync(ch,
+                new ConnectionPropertiesImpl()); // sync connection state if necessary
+            final Service.PrepareResponse response = service.apply(
+                new Service.PrepareRequest(ch.id, sql, maxRowCount));
+            return response.statement;
+          }
+        });
+  }
+
+  @Override public ExecuteResult prepareAndExecute(final StatementHandle h, final String sql,
+      final long maxRowCount, final PrepareCallback callback) throws NoSuchStatementException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ExecuteResult>() {
+            public ExecuteResult call() {
+              // sync connection state if necessary
+              connectionSync(new ConnectionHandle(h.connectionId), new ConnectionPropertiesImpl());
+              final Service.ExecuteResponse response;
+              try {
+                synchronized (callback.getMonitor()) {
+                  callback.clear();
+                  response = service.apply(
+                      new Service.PrepareAndExecuteRequest(h.connectionId,
+                          h.id, sql, maxRowCount));
+                  if (response.missingStatement) {
+                    throw new RuntimeException(new NoSuchStatementException(h));
+                  }
+                  if (response.results.size() > 0) {
+                    final Service.ResultSetResponse result = response.results.get(0);
+                    callback.assign(result.signature, result.firstFrame,
+                        result.updateCount);
+                  }
+                }
+                callback.execute();
+                List<MetaResultSet> metaResultSets = new ArrayList<>();
+                for (Service.ResultSetResponse result : response.results) {
+                  metaResultSets.add(toResultSet(null, result));
+                }
+                return new ExecuteResult(metaResultSets);
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof NoSuchStatementException) {
+        throw (NoSuchStatementException) cause;
+      }
+      throw e;
+    }
+  }
+
+  @Override public Frame fetch(final StatementHandle h, final long offset,
+      final int fetchMaxRowCount) throws NoSuchStatementException, MissingResultsException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<Frame>() {
+            public Frame call() {
+              final Service.FetchResponse response =
+                  service.apply(
+                      new Service.FetchRequest(h.connectionId, h.id, offset, fetchMaxRowCount));
+              if (response.missingStatement) {
+                throw new RuntimeException(new NoSuchStatementException(h));
+              }
+              if (response.missingResults) {
+                throw new RuntimeException(new MissingResultsException(h));
+              }
+              return response.frame;
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof NoSuchStatementException) {
+        throw (NoSuchStatementException) cause;
+      } else if (cause instanceof MissingResultsException) {
+        throw (MissingResultsException) cause;
+      }
+      throw e;
+    }
+  }
+
+  @Override public ExecuteResult execute(final StatementHandle h,
+      final List<TypedValue> parameterValues, final long maxRowCount)
+      throws NoSuchStatementException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ExecuteResult>() {
+            public ExecuteResult call() {
+              final Service.ExecuteResponse response = service.apply(
+                  new Service.ExecuteRequest(h, parameterValues, maxRowCount));
+
+              if (response.missingStatement) {
+                throw new RuntimeException(new NoSuchStatementException(h));
+              }
+
+              List<MetaResultSet> metaResultSets = new ArrayList<>();
+              for (Service.ResultSetResponse result : response.results) {
+                metaResultSets.add(toResultSet(null, result));
+              }
+
+              return new ExecuteResult(metaResultSets);
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof NoSuchStatementException) {
+        throw (NoSuchStatementException) cause;
+      }
+      throw e;
+    }
+  }
+
+  @Override public boolean syncResults(final StatementHandle h, final QueryState state,
+      final long offset) throws NoSuchStatementException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<Boolean>() {
+            public Boolean call() {
+              final Service.SyncResultsResponse response =
+                  service.apply(
+                      new Service.SyncResultsRequest(h.connectionId, h.id, state, offset));
+              if (response.missingStatement) {
+                throw new RuntimeException(new NoSuchStatementException(h));
+              }
+              return response.moreResults;
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof NoSuchStatementException) {
+        throw (NoSuchStatementException) cause;
+      }
+      throw e;
+    }
+  }
+
+  @Override public void commit(final ConnectionHandle ch) {
+    connection.invokeWithRetries(new CallableWithoutException<Void>() {
+      public Void call() {
+        final Service.CommitResponse response =
+            service.apply(new Service.CommitRequest(ch.id));
+        return null;
+      }
+    });
+  }
+
+  @Override public void rollback(final ConnectionHandle ch) {
+    connection.invokeWithRetries(new CallableWithoutException<Void>() {
+      public Void call() {
+        final Service.RollbackResponse response =
+            service.apply(new Service.RollbackRequest(ch.id));
+        return null;
+      }
+    });
+  }
+}
+
+// End RemoteMeta.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
new file mode 100644
index 0000000..828513a
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import java.io.IOException;
+
+/**
+ * ProtobufService implementation that queries against a remote implementation, using
+ * protocol buffers as the serialized form.
+ */
+public class RemoteProtobufService extends ProtobufService {
+  private final AvaticaHttpClient client;
+  private final ProtobufTranslation translation;
+
+  public RemoteProtobufService(AvaticaHttpClient client, ProtobufTranslation translation) {
+    this.client = client;
+    this.translation = translation;
+  }
+
+  @Override public Response _apply(Request request) {
+    final Response resp;
+    try {
+      byte[] response = client.send(translation.serializeRequest(request));
+      resp = translation.parseResponse(response);
+    } catch (IOException e) {
+      // Not a protobuf that we could parse.
+      throw new RuntimeException(e);
+    }
+
+    // The server had an error, throw an Exception for that.
+    if (resp instanceof ErrorResponse) {
+      throw ((ErrorResponse) resp).toException();
+    }
+
+    return resp;
+  }
+}
+
+// End RemoteProtobufService.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java
new file mode 100644
index 0000000..d4828b5
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Implementation of {@link org.apache.calcite.avatica.remote.Service}
+ * that translates requests into JSON and sends them to a remote server,
+ * usually an HTTP server.
+ */
+public class RemoteService extends JsonService {
+  private final AvaticaHttpClient client;
+
+  public RemoteService(AvaticaHttpClient client) {
+    this.client = client;
+  }
+
+  @Override public String apply(String request) {
+    byte[] response = client.send(request.getBytes(StandardCharsets.UTF_8));
+    return new String(response, StandardCharsets.UTF_8);
+  }
+}
+
+// End RemoteService.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
new file mode 100644
index 0000000..417c6ed
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+
+/**
+ * Encapsulate the logic of transforming a protobuf Request message into the Avatica POJO request.
+ */
+public class RequestTranslator {
+
+  private final Parser<? extends Message> parser;
+  private final Service.Request impl;
+
+  public RequestTranslator(Parser<? extends Message> parser, Service.Request impl) {
+    this.parser = parser;
+    this.impl = impl;
+  }
+
+  public Service.Request transform(ByteString serializedMessage) throws
+      InvalidProtocolBufferException {
+    // This should already be an aliased CodedInputStream from the WireMessage parsing.
+    Message msg = parser.parseFrom(serializedMessage.newCodedInput());
+    return impl.deserialize(msg);
+  }
+}
+
+// End RequestTranslator.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java
new file mode 100644
index 0000000..0311e13
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+
+/**
+ * Encapsulate the logic of transforming a protobuf Response message into the Avatica POJO Response.
+ */
+public class ResponseTranslator {
+
+  private final Parser<? extends Message> parser;
+  private final Service.Response impl;
+
+  public ResponseTranslator(Parser<? extends Message> parser, Service.Response impl) {
+    this.parser = parser;
+    this.impl = impl;
+  }
+
+  public Service.Response transform(ByteString serializedMessage) throws
+      InvalidProtocolBufferException {
+    Message msg = parser.parseFrom(serializedMessage);
+    return impl.deserialize(msg);
+  }
+}
+
+// End ResponseTranslator.java