You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/09/17 02:10:54 UTC

[8/8] incubator-calcite git commit: [CALCITE-840] Protocol buffer serialization over HTTP for Avatica Server (Josh Elser)

[CALCITE-840] Protocol buffer serialization over HTTP for Avatica Server (Josh Elser)

Close apache/incubator-calcite#130


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/cb7c213c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/cb7c213c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/cb7c213c

Branch: refs/heads/master
Commit: cb7c213cd74a9d684d3707b35ec993e68ceb1da5
Parents: 6c81b86
Author: Josh Elser <el...@apache.org>
Authored: Fri Sep 11 16:35:17 2015 -0400
Committer: Julian Hyde <jh...@apache.org>
Committed: Mon Sep 14 20:36:44 2015 -0700

----------------------------------------------------------------------
 LICENSE                                         |    42 +
 avatica-server/pom.xml                          |     5 +
 .../avatica/server/AvaticaProtobufHandler.java  |    65 +
 .../calcite/avatica/server/HandlerFactory.java  |    48 +
 .../org/apache/calcite/avatica/server/Main.java |    43 +-
 .../calcite/avatica/RemoteDriverMockTest.java   |   207 +
 .../calcite/avatica/RemoteDriverTest.java       |   422 +-
 .../calcite/avatica/remote/RemoteMetaTest.java  |    62 +-
 .../avatica/server/HandlerFactoryTest.java      |    58 +
 avatica/pom.xml                                 |    30 +
 .../calcite/avatica/AvaticaParameter.java       |    87 +
 .../apache/calcite/avatica/AvaticaUtils.java    |    11 +-
 .../avatica/BuiltInConnectionProperty.java      |     5 +-
 .../apache/calcite/avatica/ColumnMetaData.java  |   403 +-
 .../calcite/avatica/ConnectionConfig.java       |     1 +
 .../calcite/avatica/ConnectionConfigImpl.java   |     4 +
 .../avatica/ConnectionPropertiesImpl.java       |   147 +
 .../java/org/apache/calcite/avatica/Meta.java   |   474 +-
 .../org/apache/calcite/avatica/MetaImpl.java    |     2 +-
 .../apache/calcite/avatica/proto/Common.java    | 12364 +++++++++++++++++
 .../apache/calcite/avatica/proto/Requests.java  |  8354 +++++++++++
 .../apache/calcite/avatica/proto/Responses.java |  5793 ++++++++
 .../calcite/avatica/remote/AbstractService.java |   126 +
 .../remote/AvaticaRemoteConnectionProperty.java |     3 +-
 .../apache/calcite/avatica/remote/Driver.java   |    37 +-
 .../apache/calcite/avatica/remote/Handler.java  |     6 +-
 .../calcite/avatica/remote/JsonHandler.java     |     2 +-
 .../calcite/avatica/remote/JsonService.java     |   100 +-
 .../avatica/remote/LocalProtobufService.java    |    58 +
 .../avatica/remote/MockProtobufService.java     |   120 +
 .../calcite/avatica/remote/ProtobufHandler.java |    63 +
 .../calcite/avatica/remote/ProtobufService.java |   102 +
 .../avatica/remote/ProtobufTranslation.java     |    66 +
 .../avatica/remote/ProtobufTranslationImpl.java |   217 +
 .../calcite/avatica/remote/RemoteMeta.java      |     4 +-
 .../avatica/remote/RemoteProtobufService.java   |    66 +
 .../avatica/remote/RequestTranslator.java       |    44 +
 .../avatica/remote/ResponseTranslator.java      |    44 +
 .../apache/calcite/avatica/remote/Service.java  |  1520 +-
 .../calcite/avatica/remote/TypedValue.java      |   198 +
 .../calcite/avatica/util/AbstractCursor.java    |     7 +
 avatica/src/main/protobuf/common.proto          |   174 +
 avatica/src/main/protobuf/requests.proto        |   112 +
 avatica/src/main/protobuf/responses.proto       |    78 +
 avatica/src/main/scripts/generate-protobuf.sh   |    99 +
 .../org/apache/calcite/avatica/FrameTest.java   |    99 +
 .../avatica/remote/ProtobufHandlerTest.java     |   122 +
 .../remote/ProtobufTranslationImplTest.java     |   285 +
 .../calcite/avatica/remote/TypedValueTest.java  |   126 +
 core/pom.xml                                    |    12 -
 example/csv/pom.xml                             |    12 +-
 pom.xml                                         |    10 +
 site/_docs/howto.md                             |    44 +-
 src/main/config/checkstyle/suppressions.xml     |     1 +
 54 files changed, 32314 insertions(+), 270 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index f7b9863..0f5c755 100644
--- a/LICENSE
+++ b/LICENSE
@@ -266,3 +266,45 @@ SIL Open Font License (OFL) - http://scripts.sil.org/OFL/
 
 - site/fonts/fontawesome-webfont.*
    Font-awesome font files v4.0.3 (http://fortawesome.github.io/Font-Awesome/)
+
+-----------------------------------------------------------------------
+ 3-clause BSD license
+-----------------------------------------------------------------------
+
+The Apache Calcite project bundles Protocol Buffers, which is available
+under the following "3-clause BSD" license:
+
+    Copyright 2014, Google Inc. All rights reserved.
+
+    Redistribution and use in source and binary forms, with or
+    without modification, are permitted provided that the following
+    conditions are met:
+
+    Redistributions of source code must retain the above copyright
+    notice, this list of conditions and the following disclaimer.
+
+    Redistributions in binary form must reproduce the above
+    copyright notice, this list of conditions and the following disclaimer
+    in the documentation and/or other materials provided with the
+    distribution.
+
+    Neither the name of Google Inc. nor the names of its
+    contributors may be used to endorse or promote products derived from
+    this software without specific prior written permission.
+
+    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+    A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+    OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+    SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+    LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+    THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+    OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+    Code generated by the Protocol Buffer compiler is owned by the owner
+    of the input file used when generating it. This code is not
+    standalone and requires a support library to be linked with it. This
+    support library is itself covered by the above license.

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica-server/pom.xml
----------------------------------------------------------------------
diff --git a/avatica-server/pom.xml b/avatica-server/pom.xml
index efaa4eb..1695c07 100644
--- a/avatica-server/pom.xml
+++ b/avatica-server/pom.xml
@@ -90,6 +90,11 @@ limitations under the License.
       <artifactId>hsqldb</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
new file mode 100644
index 0000000..419ad1f
--- /dev/null
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.AvaticaUtils;
+import org.apache.calcite.avatica.remote.ProtobufHandler;
+import org.apache.calcite.avatica.remote.ProtobufTranslationImpl;
+import org.apache.calcite.avatica.remote.Service;
+
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * Jetty handler that executes Avatica JSON request-responses.
+ */
+public class AvaticaProtobufHandler extends AbstractHandler {
+
+  private final ProtobufHandler pbHandler;
+
+  public AvaticaProtobufHandler(Service service) {
+    this.pbHandler = new ProtobufHandler(service, new ProtobufTranslationImpl());
+  }
+
+  public void handle(String target, Request baseRequest,
+      HttpServletRequest request, HttpServletResponse response)
+      throws IOException, ServletException {
+    response.setContentType("application/octet-stream;charset=utf-8");
+    response.setStatus(HttpServletResponse.SC_OK);
+    if (request.getMethod().equals("POST")) {
+      byte[] requestBytes;
+      try (ServletInputStream inputStream = request.getInputStream()) {
+        requestBytes = AvaticaUtils.readFullyToBytes(inputStream);
+      }
+
+      byte[] responseBytes = pbHandler.apply(requestBytes);
+
+      baseRequest.setHandled(true);
+      response.getOutputStream().write(responseBytes);
+    }
+  }
+
+}
+
+// End AvaticaProtobufHandler.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica-server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java
new file mode 100644
index 0000000..c98f593
--- /dev/null
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.remote.Driver;
+import org.apache.calcite.avatica.remote.Service;
+
+import org.eclipse.jetty.server.Handler;
+
+/**
+ * Factory that instantiates the desired implementation, typically differing on the method
+ * used to serialize messages, for use in the Avatica server.
+ */
+public class HandlerFactory {
+
+  /**
+   * The desired implementation for the given serialization method.
+   *
+   * @param serialization The desired message serialization
+   */
+  public Handler getHandler(Service service, Driver.Serialization serialization) {
+    switch (serialization) {
+    case JSON:
+      return new AvaticaHandler(service);
+    case PROTOBUF:
+      return new AvaticaProtobufHandler(service);
+    default:
+      throw new IllegalArgumentException("Unknown Avatica handler for " + serialization.name());
+    }
+  }
+
+}
+
+// End HandlerFactory.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
index 7876e1c..82691f6 100644
--- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
+++ b/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java
@@ -20,6 +20,8 @@ import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.remote.LocalService;
 import org.apache.calcite.avatica.remote.Service;
 
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
 import java.util.Arrays;
 
 /**
@@ -36,7 +38,38 @@ public class Main {
   }
 
   /**
-   * Creates and starts an {@link HttpServer}.
+   * Factory that instantiates Jetty Handlers
+   */
+  public interface HandlerFactory {
+    AbstractHandler createHandler(Service service);
+  }
+
+  private static final HandlerFactory JSON_HANDLER_FACTORY = new HandlerFactory() {
+    public AbstractHandler createHandler(Service service) {
+      return new AvaticaHandler(service);
+    }
+  };
+
+  /**
+   * Creates and starts an {@link HttpServer} using JSON POJO serialization of requests/responses.
+   *
+   * <p>Arguments are as follows:
+   * <ul>
+   *   <li>args[0]: the {@link org.apache.calcite.avatica.Meta.Factory} class
+   *       name
+   *   <li>args[1+]: arguments passed along to
+   *   {@link org.apache.calcite.avatica.Meta.Factory#create(java.util.List)}
+   * </ul>
+   *
+   * @param args Command-line arguments
+   */
+  public static HttpServer start(String[] args) throws ClassNotFoundException,
+         InstantiationException, IllegalAccessException {
+    return start(args, 8765, JSON_HANDLER_FACTORY);
+  }
+
+  /**
+   * Creates and starts an {@link HttpServer} using the given factory to create the Handler.
    *
    * <p>Arguments are as follows:
    * <ul>
@@ -47,16 +80,18 @@ public class Main {
    * </ul>
    *
    * @param args Command-line arguments
+   * @param port Server port to bind
+   * @param handlerFactory Factory to create the handler used by the server
    */
-  public static HttpServer start(String[] args)
+  public static HttpServer start(String[] args, int port, HandlerFactory handlerFactory)
       throws ClassNotFoundException, InstantiationException,
       IllegalAccessException {
     String factoryClassName = args[0];
-    Class factoryClass = Class.forName(factoryClassName);
+    Class<?> factoryClass = Class.forName(factoryClassName);
     Meta.Factory factory = (Meta.Factory) factoryClass.newInstance();
     Meta meta = factory.create(Arrays.asList(args).subList(1, args.length));
     Service service = new LocalService(meta);
-    HttpServer server = new HttpServer(8765, new AvaticaHandler(service));
+    HttpServer server = new HttpServer(port, handlerFactory.createHandler(service));
     server.start();
     return server;
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java
new file mode 100644
index 0000000..7594391
--- /dev/null
+++ b/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica;
+
+import org.apache.calcite.avatica.remote.MockJsonService;
+import org.apache.calcite.avatica.remote.MockProtobufService.MockProtobufServiceFactory;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * RemoteDriver tests that use a Mock implementation of a Connection.
+ */
+@RunWith(Parameterized.class)
+public class RemoteDriverMockTest {
+  public static final String MJS = MockJsonService.Factory.class.getName();
+  public static final String MPBS = MockProtobufServiceFactory.class.getName();
+
+  private static Connection mjs() throws SQLException {
+    return DriverManager.getConnection("jdbc:avatica:remote:factory=" + MJS);
+  }
+
+  private static Connection mpbs() throws SQLException {
+    return DriverManager.getConnection("jdbc:avatica:remote:factory=" + MPBS);
+  }
+
+  @Parameters
+  public static List<Object[]> parameters() {
+    List<Object[]> parameters = new ArrayList<>();
+
+    parameters.add(new Object[] {new Callable<Connection>() {
+      public Connection call() throws SQLException {
+        return mjs();
+      }
+    } });
+
+    parameters.add(new Object[] {new Callable<Connection>() {
+      public Connection call() throws SQLException {
+        return mpbs();
+      }
+    } });
+
+    return parameters;
+  }
+
+  private final Callable<Connection> connectionFunctor;
+
+  public RemoteDriverMockTest(Callable<Connection> functor) {
+    this.connectionFunctor = functor;
+  }
+
+  private Connection getMockConnection() {
+    try {
+      return connectionFunctor.call();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test public void testRegister() throws Exception {
+    final Connection connection =
+        DriverManager.getConnection("jdbc:avatica:remote:");
+    assertThat(connection.isClosed(), is(false));
+    connection.close();
+    assertThat(connection.isClosed(), is(true));
+  }
+
+  @Test public void testSchemas() throws Exception {
+    final Connection connection = getMockConnection();
+    final ResultSet resultSet =
+        connection.getMetaData().getSchemas(null, null);
+    assertFalse(resultSet.next());
+    final ResultSetMetaData metaData = resultSet.getMetaData();
+    assertTrue(metaData.getColumnCount() >= 2);
+    assertEquals("TABLE_CATALOG", metaData.getColumnName(1));
+    assertEquals("TABLE_SCHEM", metaData.getColumnName(2));
+    resultSet.close();
+    connection.close();
+  }
+
+  @Test public void testTables() throws Exception {
+    final Connection connection = getMockConnection();
+    final ResultSet resultSet =
+        connection.getMetaData().getTables(null, null, null, new String[0]);
+    assertFalse(resultSet.next());
+    final ResultSetMetaData metaData = resultSet.getMetaData();
+    assertTrue(metaData.getColumnCount() >= 3);
+    assertEquals("TABLE_CAT", metaData.getColumnName(1));
+    assertEquals("TABLE_SCHEM", metaData.getColumnName(2));
+    assertEquals("TABLE_NAME", metaData.getColumnName(3));
+    resultSet.close();
+    connection.close();
+  }
+
+  @Ignore
+  @Test public void testNoFactory() throws Exception {
+    final Connection connection =
+        DriverManager.getConnection("jdbc:avatica:remote:");
+    assertThat(connection.isClosed(), is(false));
+    final ResultSet resultSet = connection.getMetaData().getSchemas();
+    assertFalse(resultSet.next());
+    final ResultSetMetaData metaData = resultSet.getMetaData();
+    assertEquals(2, metaData.getColumnCount());
+    assertEquals("TABLE_SCHEM", metaData.getColumnName(1));
+    assertEquals("TABLE_CATALOG", metaData.getColumnName(2));
+    resultSet.close();
+    connection.close();
+    assertThat(connection.isClosed(), is(true));
+  }
+
+  @Ignore
+  @Test public void testCatalogsMock() throws Exception {
+    final Connection connection = getMockConnection();
+    assertThat(connection.isClosed(), is(false));
+    final ResultSet resultSet = connection.getMetaData().getSchemas();
+    assertFalse(resultSet.next());
+    final ResultSetMetaData metaData = resultSet.getMetaData();
+    assertEquals(2, metaData.getColumnCount());
+    assertEquals("TABLE_SCHEM", metaData.getColumnName(1));
+    assertEquals("TABLE_CATALOG", metaData.getColumnName(2));
+    resultSet.close();
+    connection.close();
+    assertThat(connection.isClosed(), is(true));
+  }
+
+  @Ignore
+  @Test public void testStatementExecuteQueryMock() throws Exception {
+    checkStatementExecuteQuery(getMockConnection(), false);
+  }
+
+  @Ignore
+  @Test public void testPrepareExecuteQueryMock() throws Exception {
+    checkStatementExecuteQuery(getMockConnection(), true);
+  }
+
+  private void checkStatementExecuteQuery(Connection connection,
+      boolean prepare) throws SQLException {
+    final String sql = "select * from (\n"
+        + "  values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)";
+    final Statement statement;
+    final ResultSet resultSet;
+    final ParameterMetaData parameterMetaData;
+    if (prepare) {
+      final PreparedStatement ps = connection.prepareStatement(sql);
+      statement = ps;
+      parameterMetaData = ps.getParameterMetaData();
+      resultSet = ps.executeQuery();
+    } else {
+      statement = connection.createStatement();
+      parameterMetaData = null;
+      resultSet = statement.executeQuery(sql);
+    }
+    if (parameterMetaData != null) {
+      assertThat(parameterMetaData.getParameterCount(), equalTo(0));
+    }
+    final ResultSetMetaData metaData = resultSet.getMetaData();
+    assertEquals(2, metaData.getColumnCount());
+    assertEquals("C1", metaData.getColumnName(1));
+    assertEquals("C2", metaData.getColumnName(2));
+    assertTrue(resultSet.next());
+    assertTrue(resultSet.next());
+    assertTrue(resultSet.next());
+    assertFalse(resultSet.next());
+    resultSet.close();
+    statement.close();
+    connection.close();
+  }
+
+}
+
+// End RemoteDriverMockTest.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java
index 00f7a16..5c0dd2b 100644
--- a/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java
+++ b/avatica-server/src/test/java/org/apache/calcite/avatica/RemoteDriverTest.java
@@ -17,9 +17,12 @@
 package org.apache.calcite.avatica;
 
 import org.apache.calcite.avatica.jdbc.JdbcMeta;
+import org.apache.calcite.avatica.remote.JsonService;
 import org.apache.calcite.avatica.remote.LocalJsonService;
+import org.apache.calcite.avatica.remote.LocalProtobufService;
 import org.apache.calcite.avatica.remote.LocalService;
-import org.apache.calcite.avatica.remote.MockJsonService;
+import org.apache.calcite.avatica.remote.ProtobufTranslation;
+import org.apache.calcite.avatica.remote.ProtobufTranslationImpl;
 import org.apache.calcite.avatica.remote.Service;
 
 import com.google.common.cache.Cache;
@@ -27,6 +30,9 @@ import com.google.common.cache.Cache;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.lang.reflect.Field;
 import java.sql.Connection;
@@ -46,6 +52,7 @@ import java.util.Calendar;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -60,24 +67,25 @@ import static org.junit.Assert.fail;
 /**
  * Unit test for Avatica Remote JDBC driver.
  */
+@RunWith(Parameterized.class)
 public class RemoteDriverTest {
-  public static final String MJS =
-      MockJsonService.Factory.class.getName();
-
   public static final String LJS =
       LocalJdbcServiceFactory.class.getName();
 
   public static final String QRJS =
       QuasiRemoteJdbcServiceFactory.class.getName();
 
+  public static final String QRPBS =
+      QuasiRemotePBJdbcServiceFactory.class.getName();
+
   private static final ConnectionSpec CONNECTION_SPEC = ConnectionSpec.HSQLDB;
 
-  private Connection mjs() throws SQLException {
-    return DriverManager.getConnection("jdbc:avatica:remote:factory=" + MJS);
+  private static Connection ljs() throws SQLException {
+    return DriverManager.getConnection("jdbc:avatica:remote:factory=" + QRJS);
   }
 
-  private Connection ljs() throws SQLException {
-    return DriverManager.getConnection("jdbc:avatica:remote:factory=" + QRJS);
+  private static Connection lpbs() throws SQLException {
+    return DriverManager.getConnection("jdbc:avatica:remote:factory=" + QRPBS);
   }
 
   private Connection canon() throws SQLException {
@@ -85,11 +93,114 @@ public class RemoteDriverTest {
         CONNECTION_SPEC.username, CONNECTION_SPEC.password);
   }
 
+  /**
+   * Interface that allows for alternate ways to access internals to the Connection for testing
+   * purposes.
+   */
+  interface ConnectionInternals {
+    /**
+     * Reaches into the guts of a quasi-remote connection and pull out the
+     * statement map from the other side.
+     *
+     * <p>TODO: refactor tests to replace reflection with package-local access
+     */
+    Cache<Integer, Object> getRemoteStatementMap(AvaticaConnection connection) throws Exception;
+
+    /**
+     * Reaches into the guts of a quasi-remote connection and pull out the
+     * connection map from the other side.
+     *
+     * <p>TODO: refactor tests to replace reflection with package-local access
+     */
+    Cache<String, Connection> getRemoteConnectionMap(AvaticaConnection connection) throws Exception;
+  }
+
+  // Run each test with the LocalJsonService and LocalProtobufService
+  @Parameters
+  public static List<Object[]> parameters() {
+    List<Object[]> connections = new ArrayList<>();
+
+    // Json and Protobuf operations should be equivalent -- tests against one work on the other
+    // Each test needs to get a fresh Connection and also access some internals on that Connection.
+
+    connections.add(
+      new Object[] {
+        new Callable<Connection>() {
+          public Connection call() {
+            try {
+              return ljs();
+            } catch (SQLException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        },
+        new QuasiRemoteJdbcServiceInternals(),
+        new Callable<RequestInspection>() {
+          public RequestInspection call() throws Exception {
+            assert null != QuasiRemoteJdbcServiceFactory.requestInspection;
+            return QuasiRemoteJdbcServiceFactory.requestInspection;
+          }
+        } });
+
+    // TODO write the ConnectionInternals implementation
+    connections.add(
+      new Object[] {
+        new Callable<Connection>() {
+          public Connection call() {
+            try {
+              return lpbs();
+            } catch (SQLException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        },
+        new QuasiRemoteProtobufJdbcServiceInternals(),
+        new Callable<RequestInspection>() {
+          public RequestInspection call() throws Exception {
+            assert null != QuasiRemotePBJdbcServiceFactory.requestInspection;
+            return QuasiRemotePBJdbcServiceFactory.requestInspection;
+          }
+        } });
+
+    return connections;
+  }
+
+  private final Callable<Connection> localConnectionCallable;
+  private final ConnectionInternals localConnectionInternals;
+  private final Callable<RequestInspection> requestInspectionCallable;
+
+  public RemoteDriverTest(Callable<Connection> localConnectionCallable,
+      ConnectionInternals internals, Callable<RequestInspection> requestInspectionCallable) {
+    this.localConnectionCallable = localConnectionCallable;
+    this.localConnectionInternals = internals;
+    this.requestInspectionCallable = requestInspectionCallable;
+  }
+
+  private Connection getLocalConnection() {
+    try {
+      return localConnectionCallable.call();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ConnectionInternals getLocalConnectionInternals() {
+    return localConnectionInternals;
+  }
+
+  private RequestInspection getRequestInspection() {
+    try {
+      return requestInspectionCallable.call();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   /** Executes a lambda for the canonical connection and the local
    * connection. */
-  public void eachConnection(ConnectionFunction f) throws Exception {
+  public void eachConnection(ConnectionFunction f, Connection localConn) throws Exception {
     for (int i = 0; i < 2; i++) {
-      try (Connection connection = i == 0 ? canon() : ljs()) {
+      try (Connection connection = i == 0 ? canon() : localConn) {
         f.apply(connection);
       }
     }
@@ -98,6 +209,7 @@ public class RemoteDriverTest {
   @Before
   public void before() throws Exception {
     QuasiRemoteJdbcServiceFactory.initService();
+    QuasiRemotePBJdbcServiceFactory.initService();
   }
 
   @Test public void testRegister() throws Exception {
@@ -108,23 +220,10 @@ public class RemoteDriverTest {
     assertThat(connection.isClosed(), is(true));
   }
 
-  @Test public void testSchemas() throws Exception {
-    final Connection connection = mjs();
-    final ResultSet resultSet =
-        connection.getMetaData().getSchemas(null, null);
-    assertFalse(resultSet.next());
-    final ResultSetMetaData metaData = resultSet.getMetaData();
-    assertTrue(metaData.getColumnCount() >= 2);
-    assertEquals("TABLE_CATALOG", metaData.getColumnName(1));
-    assertEquals("TABLE_SCHEM", metaData.getColumnName(2));
-    resultSet.close();
-    connection.close();
-  }
-
   @Test public void testDatabaseProperties() throws Exception {
     ConnectionSpec.getDatabaseLock().lock();
     try {
-      final Connection connection = ljs();
+      final Connection connection = getLocalConnection();
       for (Meta.DatabaseProperty p : Meta.DatabaseProperty.values()) {
         switch (p) {
         case GET_NUMERIC_FUNCTIONS:
@@ -164,24 +263,10 @@ public class RemoteDriverTest {
     }
   }
 
-  @Test public void testTables() throws Exception {
-    final Connection connection = mjs();
-    final ResultSet resultSet =
-        connection.getMetaData().getTables(null, null, null, new String[0]);
-    assertFalse(resultSet.next());
-    final ResultSetMetaData metaData = resultSet.getMetaData();
-    assertTrue(metaData.getColumnCount() >= 3);
-    assertEquals("TABLE_CAT", metaData.getColumnName(1));
-    assertEquals("TABLE_SCHEM", metaData.getColumnName(2));
-    assertEquals("TABLE_NAME", metaData.getColumnName(3));
-    resultSet.close();
-    connection.close();
-  }
-
   @Test public void testTypeInfo() throws Exception {
     ConnectionSpec.getDatabaseLock().lock();
     try {
-      final Connection connection = ljs();
+      final Connection connection = getLocalConnection();
       final ResultSet resultSet =
           connection.getMetaData().getTypeInfo();
       assertTrue(resultSet.next());
@@ -194,6 +279,7 @@ public class RemoteDriverTest {
       assertEquals("SQL_DATETIME_SUB", metaData.getColumnName(17));
       assertEquals("NUM_PREC_RADIX", metaData.getColumnName(18));
       resultSet.close();
+      connection.close();
     } finally {
       ConnectionSpec.getDatabaseLock().unlock();
     }
@@ -215,35 +301,10 @@ public class RemoteDriverTest {
     assertThat(connection.isClosed(), is(true));
   }
 
-  @Ignore
-  @Test public void testCatalogsMock() throws Exception {
-    final Connection connection = mjs();
-    assertThat(connection.isClosed(), is(false));
-    final ResultSet resultSet = connection.getMetaData().getSchemas();
-    assertFalse(resultSet.next());
-    final ResultSetMetaData metaData = resultSet.getMetaData();
-    assertEquals(2, metaData.getColumnCount());
-    assertEquals("TABLE_SCHEM", metaData.getColumnName(1));
-    assertEquals("TABLE_CATALOG", metaData.getColumnName(2));
-    resultSet.close();
-    connection.close();
-    assertThat(connection.isClosed(), is(true));
-  }
-
   @Test public void testStatementExecuteQueryLocal() throws Exception {
     ConnectionSpec.getDatabaseLock().lock();
     try {
-      checkStatementExecuteQuery(ljs(), false);
-    } finally {
-      ConnectionSpec.getDatabaseLock().unlock();
-    }
-  }
-
-  @Ignore
-  @Test public void testStatementExecuteQueryMock() throws Exception {
-    ConnectionSpec.getDatabaseLock().lock();
-    try {
-      checkStatementExecuteQuery(mjs(), false);
+      checkStatementExecuteQuery(getLocalConnection(), false);
     } finally {
       ConnectionSpec.getDatabaseLock().unlock();
     }
@@ -252,21 +313,12 @@ public class RemoteDriverTest {
   @Test public void testPrepareExecuteQueryLocal() throws Exception {
     ConnectionSpec.getDatabaseLock().lock();
     try {
-      checkStatementExecuteQuery(ljs(), true);
+      checkStatementExecuteQuery(getLocalConnection(), true);
     } finally {
       ConnectionSpec.getDatabaseLock().unlock();
     }
   }
 
-  @Ignore
-  @Test public void testPrepareExecuteQueryMock() throws Exception {
-    ConnectionSpec.getDatabaseLock().lock();
-    try {
-      checkStatementExecuteQuery(mjs(), true);
-    } finally {
-      ConnectionSpec.getDatabaseLock().unlock();
-    }
-  }
 
   private void checkStatementExecuteQuery(Connection connection,
       boolean prepare) throws SQLException {
@@ -304,7 +356,7 @@ public class RemoteDriverTest {
   @Test public void testStatementExecuteLocal() throws Exception {
     ConnectionSpec.getDatabaseLock().lock();
     try {
-      checkStatementExecute(ljs(), false);
+      checkStatementExecute(getLocalConnection(), false);
     } finally {
       ConnectionSpec.getDatabaseLock().unlock();
     }
@@ -315,11 +367,11 @@ public class RemoteDriverTest {
     try {
       // Creating a > 100 rows queries to enable fetch request
       String sql = "select * from emp cross join emp";
-      checkExecuteFetch(ljs(), sql, false, 1);
+      checkExecuteFetch(getLocalConnection(), sql, false, 1);
       // PreparedStatement needed an extra fetch, as the execute will
       // trigger the 1st fetch. Where statement execute will execute direct
       // with results back.
-      checkExecuteFetch(ljs(), sql, true, 2);
+      checkExecuteFetch(getLocalConnection(), sql, true, 2);
     } finally {
       ConnectionSpec.getDatabaseLock().unlock();
     }
@@ -329,7 +381,7 @@ public class RemoteDriverTest {
     int fetchCountMatch) throws SQLException {
     final Statement exeStatement;
     final ResultSet results;
-    LoggingLocalJsonService.THREAD_LOG.get().enableAndClear();
+    getRequestInspection().getRequestLogger().enableAndClear();
     if (isPrepare) {
       PreparedStatement statement = conn.prepareStatement(sql);
       exeStatement = statement;
@@ -346,20 +398,20 @@ public class RemoteDriverTest {
     }
     results.close();
     exeStatement.close();
-    List<String[]> x = LoggingLocalJsonService.THREAD_LOG.get().getAndDisable();
+    List<String[]> x = getRequestInspection().getRequestLogger().getAndDisable();
     for (String[] pair : x) {
       if (pair[0].contains("\"request\":\"fetch")) {
         fetchCount++;
       }
     }
     assertEquals(count, 196);
-    assertEquals(fetchCount, fetchCountMatch);
+    assertEquals(fetchCountMatch, fetchCount);
   }
 
   @Test public void testStatementExecuteLocalMaxRow() throws Exception {
     ConnectionSpec.getDatabaseLock().lock();
     try {
-      checkStatementExecute(ljs(), false, 2);
+      checkStatementExecute(getLocalConnection(), false, 2);
     } finally {
       ConnectionSpec.getDatabaseLock().unlock();
     }
@@ -369,7 +421,7 @@ public class RemoteDriverTest {
   @Test public void testStatementPrepareExecuteLocalMaxRow() throws Exception {
     ConnectionSpec.getDatabaseLock().lock();
     try {
-      checkStatementExecute(ljs(), true, 2);
+      checkStatementExecute(getLocalConnection(), true, 2);
     } finally {
       ConnectionSpec.getDatabaseLock().unlock();
     }
@@ -378,7 +430,7 @@ public class RemoteDriverTest {
   @Test public void testPrepareExecuteLocal() throws Exception {
     ConnectionSpec.getDatabaseLock().lock();
     try {
-      checkStatementExecute(ljs(), true);
+      checkStatementExecute(getLocalConnection(), true);
     } finally {
       ConnectionSpec.getDatabaseLock().unlock();
     }
@@ -433,7 +485,7 @@ public class RemoteDriverTest {
         + "msg varchar(3) not null)";
     final String insert = "insert into TEST_TABLE values(1, 'foo')";
     final String update = "update TEST_TABLE set msg='bar' where id=1";
-    try (Connection connection = ljs();
+    try (Connection connection = getLocalConnection();
         Statement statement = connection.createStatement();
         PreparedStatement pstmt = connection.prepareStatement("values 1")) {
       // drop
@@ -533,7 +585,7 @@ public class RemoteDriverTest {
     try {
       final String query = "select * from EMP";
       try (Connection cannon = canon();
-          Connection underTest = ljs();
+          Connection underTest = getLocalConnection();
           Statement s1 = cannon.createStatement();
           Statement s2 = underTest.createStatement()) {
         assertTrue(s1.execute(query));
@@ -656,10 +708,10 @@ public class RemoteDriverTest {
 
   @Test public void testStatementLifecycle() throws Exception {
     ConnectionSpec.getDatabaseLock().lock();
-    try (AvaticaConnection connection = (AvaticaConnection) ljs()) {
+    try (AvaticaConnection connection = (AvaticaConnection) getLocalConnection()) {
       Map<Integer, AvaticaStatement> clientMap = connection.statementMap;
-      Cache<Integer, Object> serverMap =
-          QuasiRemoteJdbcServiceFactory.getRemoteStatementMap(connection);
+      Cache<Integer, Object> serverMap = getLocalConnectionInternals()
+          .getRemoteStatementMap(connection);
       // Other tests being run might leave statements in the cache.
       // The lock guards against more statements being cached during the test.
       serverMap.invalidateAll();
@@ -680,11 +732,10 @@ public class RemoteDriverTest {
     ConnectionSpec.getDatabaseLock().lock();
     try {
       final String sql = "select * from (values (1, 'a'))";
-      Connection conn1 = ljs();
-      Connection conn2 = ljs();
-      Cache<String, Connection> connectionMap =
-          QuasiRemoteJdbcServiceFactory.getRemoteConnectionMap(
-              (AvaticaConnection) conn1);
+      Connection conn1 = getLocalConnection();
+      Connection conn2 = getLocalConnection();
+      Cache<String, Connection> connectionMap = getLocalConnectionInternals()
+          .getRemoteConnectionMap((AvaticaConnection) conn1);
       // Other tests being run might leave connections in the cache.
       // The lock guards against more connections being cached during the test.
       connectionMap.invalidateAll();
@@ -716,9 +767,9 @@ public class RemoteDriverTest {
   @Test public void testPrepareBindExecuteFetch() throws Exception {
     ConnectionSpec.getDatabaseLock().lock();
     try {
-      LoggingLocalJsonService.THREAD_LOG.get().enableAndClear();
-      checkPrepareBindExecuteFetch(ljs());
-      List<String[]> x = LoggingLocalJsonService.THREAD_LOG.get().getAndDisable();
+      getRequestInspection().getRequestLogger().enableAndClear();
+      checkPrepareBindExecuteFetch(getLocalConnection());
+      List<String[]> x = getRequestInspection().getRequestLogger().getAndDisable();
       for (String[] pair : x) {
         System.out.println(pair[0] + "=" + pair[1]);
       }
@@ -779,7 +830,7 @@ public class RemoteDriverTest {
   @Test public void testPrepareBindExecuteFetchVarbinary() throws Exception {
     ConnectionSpec.getDatabaseLock().lock();
     try {
-      final Connection connection = ljs();
+      final Connection connection = getLocalConnection();
       final String sql = "select x'de' || ? as c from (values (1, 'a'))";
       final PreparedStatement ps =
           connection.prepareStatement(sql);
@@ -807,7 +858,7 @@ public class RemoteDriverTest {
             public void apply(Connection c1) throws Exception {
               checkPrepareBindExecuteFetchDate(c1);
             }
-          });
+          }, getLocalConnection());
     } finally {
       ConnectionSpec.getDatabaseLock().unlock();
     }
@@ -886,7 +937,7 @@ public class RemoteDriverTest {
             public void apply(Connection c1) throws Exception {
               checkDatabaseProperty(c1);
             }
-          });
+          }, getLocalConnection());
     } finally {
       ConnectionSpec.getDatabaseLock().unlock();
     }
@@ -920,6 +971,70 @@ public class RemoteDriverTest {
   }
 
   /**
+   * Factory that creates a fully-local Protobuf service.
+   */
+  public static class QuasiRemotePBJdbcServiceFactory implements Service.Factory {
+    private static Service service;
+
+    private static RequestInspection requestInspection;
+
+    static void initService() {
+      try {
+        final JdbcMeta jdbcMeta = new JdbcMeta(CONNECTION_SPEC.url,
+            CONNECTION_SPEC.username, CONNECTION_SPEC.password);
+        final LocalService localService = new LocalService(jdbcMeta);
+        service = new LoggingLocalProtobufService(localService, new ProtobufTranslationImpl());
+        requestInspection = (RequestInspection) service;
+      } catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override public Service create(AvaticaConnection connection) {
+      assert null != service;
+      return service;
+    }
+  }
+
+  /**
+   * Proxy that logs all requests passed into the {@link LocalProtobufService}.
+   */
+  public static class LoggingLocalProtobufService extends LocalProtobufService
+      implements RequestInspection {
+    private static final ThreadLocal<RequestLogger> THREAD_LOG =
+        new ThreadLocal<RequestLogger>() {
+          @Override protected RequestLogger initialValue() {
+            return new RequestLogger();
+          }
+        };
+
+    public LoggingLocalProtobufService(Service service, ProtobufTranslation translation) {
+      super(service, translation);
+    }
+
+    @Override public RequestLogger getRequestLogger() {
+      return THREAD_LOG.get();
+    }
+
+    @Override public Response _apply(Request request) {
+      final RequestLogger logger = THREAD_LOG.get();
+      try {
+        String jsonRequest = JsonService.MAPPER.writeValueAsString(request);
+        logger.requestStart(jsonRequest);
+
+        Response response = super._apply(request);
+
+        String jsonResponse = JsonService.MAPPER.writeValueAsString(response);
+        logger.requestEnd(jsonRequest, jsonResponse);
+
+        return response;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
    * Factory that creates a service based on a local JDBC connection.
    */
   public static class QuasiRemoteJdbcServiceFactory implements Service.Factory {
@@ -927,12 +1042,15 @@ public class RemoteDriverTest {
     /** a singleton instance that is recreated for each test */
     private static Service service;
 
+    private static RequestInspection requestInspection;
+
     static void initService() {
       try {
         final JdbcMeta jdbcMeta = new JdbcMeta(CONNECTION_SPEC.url,
             CONNECTION_SPEC.username, CONNECTION_SPEC.password);
         final LocalService localService = new LocalService(jdbcMeta);
         service = new LoggingLocalJsonService(localService);
+        requestInspection = (RequestInspection) service;
       } catch (SQLException e) {
         throw new RuntimeException(e);
       }
@@ -942,13 +1060,15 @@ public class RemoteDriverTest {
       assert service != null;
       return service;
     }
+  }
 
-    /**
-     * Reach into the guts of a quasi-remote connection and pull out the
-     * statement map from the other side.
-     * TODO: refactor tests to replace reflection with package-local access
-     */
-    static Cache<Integer, Object>
+  /**
+   * Implementation that reaches into current connection state via reflection to extract certain
+   * internal information.
+   */
+  public static class QuasiRemoteJdbcServiceInternals implements ConnectionInternals {
+
+    @Override public Cache<Integer, Object>
     getRemoteStatementMap(AvaticaConnection connection) throws Exception {
       Field metaF = AvaticaConnection.class.getDeclaredField("meta");
       metaF.setAccessible(true);
@@ -968,15 +1088,12 @@ public class RemoteDriverTest {
       Field jdbcMetaStatementMapF = JdbcMeta.class.getDeclaredField("statementCache");
       jdbcMetaStatementMapF.setAccessible(true);
       //noinspection unchecked
-      return (Cache<Integer, Object>) jdbcMetaStatementMapF.get(serverMeta);
+      @SuppressWarnings("unchecked")
+      Cache<Integer, Object> cache = (Cache<Integer, Object>) jdbcMetaStatementMapF.get(serverMeta);
+      return cache;
     }
 
-    /**
-     * Reach into the guts of a quasi-remote connection and pull out the
-     * connection map from the other side.
-     * TODO: refactor tests to replace reflection with package-local access
-     */
-    static Cache<String, Connection>
+    @Override public Cache<String, Connection>
     getRemoteConnectionMap(AvaticaConnection connection) throws Exception {
       Field metaF = AvaticaConnection.class.getDeclaredField("meta");
       metaF.setAccessible(true);
@@ -996,13 +1113,84 @@ public class RemoteDriverTest {
       Field jdbcMetaConnectionCacheF = JdbcMeta.class.getDeclaredField("connectionCache");
       jdbcMetaConnectionCacheF.setAccessible(true);
       //noinspection unchecked
-      return (Cache<String, Connection>) jdbcMetaConnectionCacheF.get(serverMeta);
+      @SuppressWarnings("unchecked")
+      Cache<String, Connection> cache =
+          (Cache<String, Connection>) jdbcMetaConnectionCacheF.get(serverMeta);
+      return cache;
     }
   }
 
+  /**
+   * Implementation that reaches into current connection state via reflection to extract certain
+   * internal information.
+   */
+  public static class QuasiRemoteProtobufJdbcServiceInternals implements ConnectionInternals {
+
+    @Override public Cache<Integer, Object>
+    getRemoteStatementMap(AvaticaConnection connection) throws Exception {
+      Field metaF = AvaticaConnection.class.getDeclaredField("meta");
+      metaF.setAccessible(true);
+      Meta clientMeta = (Meta) metaF.get(connection);
+      Field remoteMetaServiceF = clientMeta.getClass().getDeclaredField("service");
+      remoteMetaServiceF.setAccessible(true);
+      LocalProtobufService remoteMetaService =
+          (LocalProtobufService) remoteMetaServiceF.get(clientMeta);
+      // Use the explicitly class to avoid issues with LoggingLocalJsonService
+      Field remoteMetaServiceServiceF = LocalProtobufService.class.getDeclaredField("service");
+      remoteMetaServiceServiceF.setAccessible(true);
+      LocalService remoteMetaServiceService =
+          (LocalService) remoteMetaServiceServiceF.get(remoteMetaService);
+      Field remoteMetaServiceServiceMetaF =
+          remoteMetaServiceService.getClass().getDeclaredField("meta");
+      remoteMetaServiceServiceMetaF.setAccessible(true);
+      JdbcMeta serverMeta = (JdbcMeta) remoteMetaServiceServiceMetaF.get(remoteMetaServiceService);
+      Field jdbcMetaStatementMapF = JdbcMeta.class.getDeclaredField("statementCache");
+      jdbcMetaStatementMapF.setAccessible(true);
+      //noinspection unchecked
+      @SuppressWarnings("unchecked")
+      Cache<Integer, Object> cache = (Cache<Integer, Object>) jdbcMetaStatementMapF.get(serverMeta);
+      return cache;
+    }
+
+    @Override public Cache<String, Connection>
+    getRemoteConnectionMap(AvaticaConnection connection) throws Exception {
+      Field metaF = AvaticaConnection.class.getDeclaredField("meta");
+      metaF.setAccessible(true);
+      Meta clientMeta = (Meta) metaF.get(connection);
+      Field remoteMetaServiceF = clientMeta.getClass().getDeclaredField("service");
+      remoteMetaServiceF.setAccessible(true);
+      LocalProtobufService remoteMetaService =
+          (LocalProtobufService) remoteMetaServiceF.get(clientMeta);
+      // Get the field explicitly off the correct class to avoid LocalLoggingJsonService.class
+      Field remoteMetaServiceServiceF = LocalProtobufService.class.getDeclaredField("service");
+      remoteMetaServiceServiceF.setAccessible(true);
+      LocalService remoteMetaServiceService =
+          (LocalService) remoteMetaServiceServiceF.get(remoteMetaService);
+      Field remoteMetaServiceServiceMetaF =
+          remoteMetaServiceService.getClass().getDeclaredField("meta");
+      remoteMetaServiceServiceMetaF.setAccessible(true);
+      JdbcMeta serverMeta = (JdbcMeta) remoteMetaServiceServiceMetaF.get(remoteMetaServiceService);
+      Field jdbcMetaConnectionCacheF = JdbcMeta.class.getDeclaredField("connectionCache");
+      jdbcMetaConnectionCacheF.setAccessible(true);
+      //noinspection unchecked
+      @SuppressWarnings("unchecked")
+      Cache<String, Connection> cache =
+          (Cache<String, Connection>) jdbcMetaConnectionCacheF.get(serverMeta);
+      return cache;
+    }
+  }
+
+  /**
+   * Provides access to a log of requests.
+   */
+  interface RequestInspection {
+    RequestLogger getRequestLogger();
+  }
+
   /** Extension to {@link LocalJsonService} that writes requests and responses
    * into a thread-local. */
-  private static class LoggingLocalJsonService extends LocalJsonService {
+  private static class LoggingLocalJsonService extends LocalJsonService
+      implements RequestInspection {
     private static final ThreadLocal<RequestLogger> THREAD_LOG =
         new ThreadLocal<RequestLogger>() {
           @Override protected RequestLogger initialValue() {
@@ -1021,6 +1209,10 @@ public class RemoteDriverTest {
       logger.requestEnd(request, response);
       return response;
     }
+
+    @Override public RequestLogger getRequestLogger() {
+      return THREAD_LOG.get();
+    }
   }
 
   /** Logs request and response strings if enabled. */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java
index 4cef99e..c4545e3 100644
--- a/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java
+++ b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java
@@ -22,14 +22,20 @@ import org.apache.calcite.avatica.ConnectionPropertiesImpl;
 import org.apache.calcite.avatica.ConnectionSpec;
 import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.jdbc.JdbcMeta;
+import org.apache.calcite.avatica.server.AvaticaHandler;
+import org.apache.calcite.avatica.server.AvaticaProtobufHandler;
 import org.apache.calcite.avatica.server.HttpServer;
 import org.apache.calcite.avatica.server.Main;
+import org.apache.calcite.avatica.server.Main.HandlerFactory;
 
 import com.google.common.cache.Cache;
 
+import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -38,6 +44,7 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -48,13 +55,14 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /** Tests covering {@link RemoteMeta}. */
+@RunWith(Parameterized.class)
 public class RemoteMetaTest {
   private static final ConnectionSpec CONNECTION_SPEC = ConnectionSpec.HSQLDB;
 
-  private static HttpServer start;
-  private static String url;
+  // Keep a reference to the servers we start to clean them up after
+  private static final List<HttpServer> ACTIVE_SERVERS = new ArrayList<>();
 
-  /** Factory that provides a JMeta */
+  /** Factory that provides a {@link JdbcMeta}. */
   public static class FullyRemoteJdbcMetaFactory implements Meta.Factory {
 
     private static JdbcMeta instance = null;
@@ -76,15 +84,49 @@ public class RemoteMetaTest {
     }
   }
 
-  @BeforeClass public static void beforeClass() throws Exception {
-    start = Main.start(new String[] { FullyRemoteJdbcMetaFactory.class.getName() });
-    final int port = start.getPort();
-    url = "jdbc:avatica:remote:url=http://localhost:" + port;
+  @Parameters
+  public static List<Object[]> parameters() throws Exception {
+    List<Object[]> params = new ArrayList<>();
+
+    final String[] mainArgs = new String[] { FullyRemoteJdbcMetaFactory.class.getName() };
+
+    // Bind to '0' to pluck an ephemeral port instead of expecting a certain one to be free
+
+    final HttpServer jsonServer = Main.start(mainArgs, 0, new HandlerFactory() {
+      @Override public AbstractHandler createHandler(Service service) {
+        return new AvaticaHandler(service);
+      }
+    });
+    params.add(new Object[] {jsonServer, Driver.Serialization.JSON});
+    ACTIVE_SERVERS.add(jsonServer);
+
+    final HttpServer protobufServer = Main.start(mainArgs, 0, new HandlerFactory() {
+      @Override public AbstractHandler createHandler(Service service) {
+        return new AvaticaProtobufHandler(service);
+      }
+    });
+    params.add(new Object[] {protobufServer, Driver.Serialization.PROTOBUF});
+
+    ACTIVE_SERVERS.add(protobufServer);
+
+    return params;
+  }
+
+  private final HttpServer server;
+  private final String url;
+
+  public RemoteMetaTest(HttpServer server, Driver.Serialization serialization) {
+    this.server = server;
+    final int port = server.getPort();
+    url = "jdbc:avatica:remote:url=http://localhost:" + port + ";serialization="
+        + serialization.name();
   }
 
   @AfterClass public static void afterClass() throws Exception {
-    if (start != null) {
-      start.stop();
+    for (HttpServer server : ACTIVE_SERVERS) {
+      if (server != null) {
+        server.stop();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica-server/src/test/java/org/apache/calcite/avatica/server/HandlerFactoryTest.java
----------------------------------------------------------------------
diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/server/HandlerFactoryTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/server/HandlerFactoryTest.java
new file mode 100644
index 0000000..13e9f7d
--- /dev/null
+++ b/avatica-server/src/test/java/org/apache/calcite/avatica/server/HandlerFactoryTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.remote.Driver.Serialization;
+import org.apache.calcite.avatica.remote.Service;
+
+import org.eclipse.jetty.server.Handler;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link HandlerFactory} implementation.
+ */
+public class HandlerFactoryTest {
+
+  private HandlerFactory factory;
+  private Service service;
+
+  @Before
+  public void setup() {
+    this.factory = new HandlerFactory();
+    this.service = Mockito.mock(Service.class);
+  }
+
+  @Test
+  public void testJson() {
+    Handler handler = factory.getHandler(service, Serialization.JSON);
+    assertTrue("Expected an implementation of the AvaticaHandler, "
+        + "but got " + handler.getClass(), handler instanceof AvaticaHandler);
+  }
+
+  @Test
+  public void testProtobuf() {
+    Handler handler = factory.getHandler(service, Serialization.PROTOBUF);
+    assertTrue("Expected an implementation of the AvaticaProtobufHandler, "
+        + "but got " + handler.getClass(), handler instanceof AvaticaProtobufHandler);
+  }
+}
+
+// End HandlerFactoryTest.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/pom.xml
----------------------------------------------------------------------
diff --git a/avatica/pom.xml b/avatica/pom.xml
index 65e838a..e75de0e 100644
--- a/avatica/pom.xml
+++ b/avatica/pom.xml
@@ -49,6 +49,10 @@ limitations under the License.
       <artifactId>jackson-databind</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -58,6 +62,11 @@ limitations under the License.
       <artifactId>hamcrest-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
@@ -105,6 +114,27 @@ limitations under the License.
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <relocations>
+                <relocation>
+                  <pattern>com.google.protobuf</pattern>
+                  <shadedPattern>org.apache.calcite.avatica.com.google.protobuf</shadedPattern>
+                </relocation>
+              </relocations>
+              <createDependencyReducedPom>false</createDependencyReducedPom>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/AvaticaParameter.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaParameter.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaParameter.java
index ac011ed..0f0b473 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaParameter.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaParameter.java
@@ -16,6 +16,8 @@
  */
 package org.apache.calcite.avatica;
 
+import org.apache.calcite.avatica.proto.Common;
+
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
@@ -49,6 +51,91 @@ public class AvaticaParameter {
     this.name = name;
   }
 
+  public Common.AvaticaParameter toProto() {
+    Common.AvaticaParameter.Builder builder = Common.AvaticaParameter.newBuilder();
+
+    builder.setSigned(signed);
+    builder.setPrecision(precision);
+    builder.setScale(scale);
+    builder.setParameterType(parameterType);
+    builder.setTypeName(typeName);
+    builder.setClassName(className);
+    builder.setName(name);
+
+    return builder.build();
+  }
+
+  public static AvaticaParameter fromProto(Common.AvaticaParameter proto) {
+    return new AvaticaParameter(proto.getSigned(), proto.getPrecision(),
+        proto.getScale(), proto.getParameterType(), proto.getTypeName(),
+        proto.getClassName(), proto.getName());
+  }
+
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((className == null) ? 0 : className.hashCode());
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    result = prime * result + parameterType;
+    result = prime * result + precision;
+    result = prime * result + scale;
+    result = prime * result + (signed ? 1231 : 1237);
+    result = prime * result + ((typeName == null) ? 0 : typeName.hashCode());
+    return result;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof AvaticaParameter) {
+      AvaticaParameter other = (AvaticaParameter) obj;
+
+      if (null == className) {
+        if (null != other.className) {
+          return false;
+        }
+      } else if (!className.equals(other.className)) {
+        return false;
+      }
+
+      if (null == name) {
+        if (null != other.name) {
+          return false;
+        }
+      } else if (!name.equals(other.name)) {
+        return false;
+      }
+
+      if (parameterType != other.parameterType) {
+        return false;
+      }
+
+      if (precision != other.precision) {
+        return false;
+      }
+
+      if (scale != other.scale) {
+        return false;
+      }
+
+      if (signed != other.signed) {
+        return false;
+      }
+
+      if (null == typeName) {
+        if (null != other.typeName) {
+          return false;
+        }
+      } else if (!typeName.equals(other.typeName)) {
+        return false;
+      }
+
+      return true;
+    }
+
+    return false;
+  }
 }
 
 // End AvaticaParameter.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
index a9975e0..6dd076b 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
@@ -196,6 +196,15 @@ public class AvaticaUtils {
 
   /** Reads the contents of an input stream and returns as a string. */
   public static String readFully(InputStream inputStream) throws IOException {
+    return _readFully(inputStream).toString();
+  }
+
+  public static byte[] readFullyToBytes(InputStream inputStream) throws IOException {
+    return _readFully(inputStream).toByteArray();
+  }
+
+  /** Reads the contents of an input stream and returns a ByteArrayOutputStrema. */
+  static ByteArrayOutputStream _readFully(InputStream inputStream) throws IOException {
     final byte[] bytes = new byte[4096];
     final ByteArrayOutputStream baos = new ByteArrayOutputStream();
     for (;;) {
@@ -205,7 +214,7 @@ public class AvaticaUtils {
       }
       baos.write(bytes, 0, count);
     }
-    return baos.toString();
+    return baos;
   }
 
   /** Invokes {@code Statement#setLargeMaxRows}, falling back on

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java b/avatica/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
index e2b7b98..558d7ba 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/BuiltInConnectionProperty.java
@@ -37,7 +37,10 @@ public enum BuiltInConnectionProperty implements ConnectionProperty {
   TIMEZONE("timezone", Type.STRING, null, false),
 
   /** Remote URL. */
-  URL("url", Type.STRING, null, false);
+  URL("url", Type.STRING, null, false),
+
+  /** Serialization used over remote connections */
+  SERIALIZATION("serialization", Type.STRING, "json", false);
 
   private final String camelName;
   private final Type type;

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/ColumnMetaData.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/ColumnMetaData.java b/avatica/src/main/java/org/apache/calcite/avatica/ColumnMetaData.java
index 2d7821e..b1e70c4 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/ColumnMetaData.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/ColumnMetaData.java
@@ -16,21 +16,23 @@
  */
 package org.apache.calcite.avatica;
 
+import org.apache.calcite.avatica.proto.Common;
 import org.apache.calcite.avatica.util.ByteString;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.google.protobuf.Descriptors.Descriptor;
 
 import java.lang.reflect.Type;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -39,7 +41,8 @@ import java.util.Objects;
 
 /**
  * Metadata for a column.
- * (Compare with {@link ResultSetMetaData}.)
+ *
+ * <p>(Compare with {@link java.sql.ResultSetMetaData}.)
  */
 public class ColumnMetaData {
   public final int ordinal; // 0-based
@@ -114,6 +117,247 @@ public class ColumnMetaData {
     this.columnClassName = columnClassName;
   }
 
+  public Common.ColumnMetaData toProto() {
+    Common.ColumnMetaData.Builder builder = Common.ColumnMetaData.newBuilder();
+
+    // Primitive fields (can't be null)
+    builder.setOrdinal(ordinal)
+      .setAutoIncrement(autoIncrement)
+      .setCaseSensitive(caseSensitive)
+      .setSearchable(searchable)
+      .setCurrency(currency)
+      .setNullable(nullable)
+      .setSigned(signed)
+      .setDisplaySize(displaySize)
+      .setPrecision(precision)
+      .setScale(scale)
+      .setReadOnly(readOnly)
+      .setWritable(writable)
+      .setDefinitelyWritable(definitelyWritable);
+
+    // Potentially null fields
+    if (null != label) {
+      builder.setLabel(label);
+    }
+
+    if (null != columnName) {
+      builder.setColumnName(columnName);
+    }
+
+    if (null != schemaName) {
+      builder.setSchemaName(schemaName);
+    }
+
+    if (null != tableName) {
+      builder.setTableName(tableName);
+    }
+
+    if (null != catalogName) {
+      builder.setCatalogName(catalogName);
+    }
+
+    if (null != type) {
+      builder.setType(type.toProto());
+    }
+
+    if (null != columnClassName) {
+      builder.setColumnClassName(columnClassName);
+    }
+
+    return builder.build();
+  }
+
+  public static ColumnMetaData fromProto(Common.ColumnMetaData proto) {
+    AvaticaType nestedType = AvaticaType.fromProto(proto.getType());
+    final Descriptor desc = proto.getDescriptorForType();
+
+    String catalogName = null;
+    if (proto.hasField(desc.findFieldByNumber(Common.ColumnMetaData.CATALOG_NAME_FIELD_NUMBER))) {
+      catalogName = proto.getCatalogName();
+    }
+
+    String schemaName = null;
+    if (proto.hasField(desc.findFieldByNumber(Common.ColumnMetaData.SCHEMA_NAME_FIELD_NUMBER))) {
+      schemaName = proto.getSchemaName();
+    }
+
+    String label = null;
+    if (proto.hasField(desc.findFieldByNumber(Common.ColumnMetaData.LABEL_FIELD_NUMBER))) {
+      label = proto.getLabel();
+    }
+
+    String columnName = null;
+    if (proto.hasField(desc.findFieldByNumber(Common.ColumnMetaData.COLUMN_NAME_FIELD_NUMBER))) {
+      columnName = proto.getColumnName();
+    }
+
+    String tableName = null;
+    if (proto.hasField(desc.findFieldByNumber(Common.ColumnMetaData.TABLE_NAME_FIELD_NUMBER))) {
+      tableName = proto.getTableName();
+    }
+
+    String columnClassName = null;
+    if (proto.hasField(
+        desc.findFieldByNumber(Common.ColumnMetaData.COLUMN_CLASS_NAME_FIELD_NUMBER))) {
+      columnClassName = proto.getColumnClassName();
+    }
+
+    // Recreate the ColumnMetaData
+    return new ColumnMetaData(proto.getOrdinal(), proto.getAutoIncrement(),
+        proto.getCaseSensitive(), proto.getSearchable(), proto.getCurrency(), proto.getNullable(),
+        proto.getSigned(), proto.getDisplaySize(), label, columnName,
+        schemaName, proto.getPrecision(), proto.getScale(), tableName,
+        catalogName, nestedType, proto.getReadOnly(), proto.getWritable(),
+        proto.getDefinitelyWritable(), columnClassName);
+  }
+
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (autoIncrement ? 1231 : 1237);
+    result = prime * result + (caseSensitive ? 1231 : 1237);
+    result = prime * result + ((catalogName == null) ? 0 : catalogName.hashCode());
+    result = prime * result + ((columnClassName == null) ? 0 : columnClassName.hashCode());
+    result = prime * result + ((columnName == null) ? 0 : columnName.hashCode());
+    result = prime * result + (currency ? 1231 : 1237);
+    result = prime * result + (definitelyWritable ? 1231 : 1237);
+    result = prime * result + displaySize;
+    result = prime * result + ((label == null) ? 0 : label.hashCode());
+    result = prime * result + nullable;
+    result = prime * result + ordinal;
+    result = prime * result + precision;
+    result = prime * result + (readOnly ? 1231 : 1237);
+    result = prime * result + scale;
+    result = prime * result + ((schemaName == null) ? 0 : schemaName.hashCode());
+    result = prime * result + (searchable ? 1231 : 1237);
+    result = prime * result + (signed ? 1231 : 1237);
+    result = prime * result + ((tableName == null) ? 0 : tableName.hashCode());
+    result = prime * result + ((type == null) ? 0 : type.hashCode());
+    result = prime * result + (writable ? 1231 : 1237);
+    return result;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof ColumnMetaData) {
+      ColumnMetaData other = (ColumnMetaData) obj;
+
+      if (autoIncrement != other.autoIncrement) {
+        return false;
+      }
+
+      if (caseSensitive != other.caseSensitive) {
+        return false;
+      }
+
+      if (null == catalogName) {
+        if (null != other.catalogName) {
+          return false;
+        }
+      } else if (!catalogName.equals(other.catalogName)) {
+        return false;
+      }
+
+      if (null == columnClassName) {
+        if (null != other.columnClassName) {
+          return false;
+        }
+      } else if (!columnClassName.equals(other.columnClassName)) {
+        return false;
+      }
+
+      if (null == columnName) {
+        if (null != other.columnName) {
+          return false;
+        }
+      } else if (!columnName.equals(other.columnName)) {
+        return false;
+      }
+
+      if (currency != other.currency) {
+        return false;
+      }
+
+      if (definitelyWritable != other.definitelyWritable) {
+        return false;
+      }
+
+      if (displaySize != other.displaySize) {
+        return false;
+      }
+
+      if (null == label) {
+        if (null != other.label) {
+          return false;
+        }
+      } else if (!label.equals(other.label)) {
+        return false;
+      }
+
+      if (nullable != other.nullable) {
+        return false;
+      }
+
+      if (ordinal != other.ordinal) {
+        return false;
+      }
+
+      if (precision != other.precision) {
+        return false;
+      }
+
+      if (readOnly != other.readOnly) {
+        return false;
+      }
+
+      if (scale != other.scale) {
+        return false;
+      }
+
+      if (null == schemaName) {
+        if (null != other.schemaName) {
+          return false;
+        }
+      } else if (!schemaName.equals(other.schemaName)) {
+        return false;
+      }
+
+      if (searchable != other.searchable) {
+        return false;
+      }
+
+      if (signed != other.signed) {
+        return false;
+      }
+
+      if (null == tableName) {
+        if (null != other.tableName) {
+          return false;
+        }
+      } else if (!tableName.equals(other.tableName)) {
+        return false;
+      }
+
+      if (null == type) {
+        if (null != other.type) {
+          return false;
+        }
+      } else if (!type.equals(other.type)) {
+        return false;
+      }
+
+      if (writable != other.writable) {
+        return false;
+      }
+
+      return true;
+    }
+
+    return false;
+  }
+
   private static <T> T first(T t0, T t1) {
     return t0 != null ? t0 : t1;
   }
@@ -136,7 +380,7 @@ public class ColumnMetaData {
 
   /** Creates a ColumnMetaData for result sets that are not based on a struct
    * but need to have a single 'field' for purposes of
-   * {@link ResultSetMetaData}. */
+   * {@link java.sql.ResultSetMetaData}. */
   public static ColumnMetaData dummy(AvaticaType type, boolean nullable) {
     return new ColumnMetaData(
         0,
@@ -272,6 +516,14 @@ public class ColumnMetaData {
         return resultSet.getObject(i);
       }
     }
+
+    public Common.Rep toProto() {
+      return Common.Rep.valueOf(name());
+    }
+
+    public static Rep fromProto(Common.Rep proto) {
+      return Rep.valueOf(proto.name());
+    }
   }
 
   /** Base class for a column type. */
@@ -290,7 +542,7 @@ public class ColumnMetaData {
     /** The type of the field that holds the value. Not a JDBC property. */
     public final Rep rep;
 
-    protected AvaticaType(int id, String name, Rep rep) {
+    public AvaticaType(int id, String name, Rep rep) {
       this.id = id;
       this.name = Objects.requireNonNull(name);
       this.rep = Objects.requireNonNull(rep);
@@ -303,6 +555,79 @@ public class ColumnMetaData {
     public AvaticaType setRep(Rep rep) {
       throw new UnsupportedOperationException();
     }
+
+    public Common.AvaticaType toProto() {
+      Common.AvaticaType.Builder builder = Common.AvaticaType.newBuilder();
+
+      builder.setName(name);
+      builder.setId(id);
+      builder.setRep(rep.toProto());
+
+      return builder.build();
+    }
+
+    public static AvaticaType fromProto(Common.AvaticaType proto) {
+      Common.Rep repProto = proto.getRep();
+      Rep rep = Rep.valueOf(repProto.name());
+      AvaticaType type;
+
+      if (proto.hasComponent()) {
+        // ArrayType
+        // recurse on the type for the array elements
+        AvaticaType nestedType = AvaticaType.fromProto(proto.getComponent());
+        type = ColumnMetaData.array(nestedType, proto.getName(), rep);
+      } else if (proto.getColumnsCount() > 0) {
+        // StructType
+        List<ColumnMetaData> columns = new ArrayList<>(proto.getColumnsCount());
+        for (Common.ColumnMetaData protoColumn : proto.getColumnsList()) {
+          columns.add(ColumnMetaData.fromProto(protoColumn));
+        }
+        type = ColumnMetaData.struct(columns);
+      } else {
+        // ScalarType
+        type = ColumnMetaData.scalar(proto.getId(), proto.getName(), rep);
+      }
+
+      return type;
+    }
+
+    @Override public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + id;
+      result = prime * result + ((name == null) ? 0 : name.hashCode());
+      result = prime * result + ((rep == null) ? 0 : rep.hashCode());
+      return result;
+    }
+
+    @Override public boolean equals(Object o) {
+      if (o == this) {
+        return true;
+      }
+      if (o instanceof AvaticaType) {
+        AvaticaType other = (AvaticaType) o;
+
+        if (id != other.id) {
+          return false;
+        }
+
+        if (name == null) {
+          if (other.name != null) {
+            return false;
+          }
+        } else if (!name.equals(other.name)) {
+          return false;
+        }
+
+        if (rep != other.rep) {
+          return false;
+        }
+
+        return true;
+      }
+
+      return false;
+    }
   }
 
   /** Scalar type. */
@@ -328,6 +653,41 @@ public class ColumnMetaData {
       super(Types.STRUCT, "STRUCT", ColumnMetaData.Rep.OBJECT);
       this.columns = columns;
     }
+
+    @Override public Common.AvaticaType toProto() {
+      Common.AvaticaType.Builder builder = Common.AvaticaType.newBuilder(super.toProto());
+      for (ColumnMetaData valueType : columns) {
+        builder.addColumns(valueType.toProto());
+      }
+      return builder.build();
+    }
+
+    @Override public int hashCode() {
+      return 31 * (super.hashCode() + (null == columns ? 0 : columns.hashCode()));
+    }
+
+    @Override public boolean equals(Object o) {
+      if (o == this) {
+        return true;
+      }
+      if (!super.equals(o)) {
+        return false;
+      }
+
+      if (o instanceof StructType) {
+        StructType other = (StructType) o;
+
+        if (null == columns) {
+          if (null != other.columns) {
+            return false;
+          }
+        }
+
+        return columns.equals(other.columns);
+      }
+
+      return false;
+    }
   }
 
   /** Array type. */
@@ -339,6 +699,41 @@ public class ColumnMetaData {
       super(type, typeName, representation);
       this.component = component;
     }
+
+    @Override public Common.AvaticaType toProto() {
+      Common.AvaticaType.Builder builder = Common.AvaticaType.newBuilder(super.toProto());
+
+      builder.setComponent(component.toProto());
+
+      return builder.build();
+    }
+
+    @Override public int hashCode() {
+      return 31 * (super.hashCode() + (null == component ? 0 : component.hashCode()));
+    }
+
+    @Override public boolean equals(Object o) {
+      if (o == this) {
+        return true;
+      }
+      if (!super.equals(o)) {
+        return false;
+      }
+
+      if (o instanceof ArrayType) {
+        ArrayType other = (ArrayType) o;
+
+        if (null == component) {
+          if (null != other.component) {
+            return false;
+          }
+        }
+
+        return component.equals(other.component);
+      }
+
+      return false;
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java b/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
index 31e8692..1eed8dc 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfig.java
@@ -26,6 +26,7 @@ public interface ConnectionConfig {
   String timeZone();
   Service.Factory factory();
   String url();
+  String serialization();
 }
 
 // End ConnectionConfig.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
index 3200bee..662c03a 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/ConnectionConfigImpl.java
@@ -47,6 +47,10 @@ public class ConnectionConfigImpl implements ConnectionConfig {
     return BuiltInConnectionProperty.URL.wrap(properties).getString();
   }
 
+  public String serialization() {
+    return BuiltInConnectionProperty.SERIALIZATION.wrap(properties).getString();
+  }
+
   /** Converts a {@link Properties} object containing (name, value)
    * pairs into a map whose keys are
    * {@link org.apache.calcite.avatica.InternalProperty} objects.

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/ConnectionPropertiesImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/ConnectionPropertiesImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/ConnectionPropertiesImpl.java
index 6086b35..4bef781 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/ConnectionPropertiesImpl.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/ConnectionPropertiesImpl.java
@@ -16,8 +16,12 @@
  */
 package org.apache.calcite.avatica;
 
+import org.apache.calcite.avatica.proto.Common;
+import org.apache.calcite.avatica.remote.ProtobufService;
+
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.protobuf.Descriptors.Descriptor;
 
 import java.sql.Connection;
 import java.sql.SQLException;
@@ -175,6 +179,149 @@ public class ConnectionPropertiesImpl implements Meta.ConnectionProperties {
   public String getSchema() {
     return this.schema;
   }
+
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((autoCommit == null) ? 0 : autoCommit.hashCode());
+    result = prime * result + ((catalog == null) ? 0 : catalog.hashCode());
+    result = prime * result + (isDirty ? 1231 : 1237);
+    result = prime * result + ((readOnly == null) ? 0 : readOnly.hashCode());
+    result = prime * result + ((schema == null) ? 0 : schema.hashCode());
+    result = prime * result
+        + ((transactionIsolation == null) ? 0 : transactionIsolation.hashCode());
+    return result;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (o == this) {
+      return true;
+    }
+    if (o instanceof ConnectionPropertiesImpl) {
+      ConnectionPropertiesImpl other = (ConnectionPropertiesImpl) o;
+
+      if (null == autoCommit) {
+        if (null != other.autoCommit) {
+          return false;
+        }
+      } else if (!autoCommit.equals(other.autoCommit)) {
+        return false;
+      }
+
+      if (null == catalog) {
+        if (null != other.catalog) {
+          return false;
+        }
+      } else if (!catalog.equals(other.catalog)) {
+        return false;
+      }
+
+      if (isDirty != other.isDirty) {
+        return false;
+      }
+
+      if (null == readOnly) {
+        if (null != other.readOnly) {
+          return false;
+        }
+      } else if (!readOnly.equals(other.readOnly)) {
+        return false;
+      }
+
+      if (null == schema) {
+        if (null != other.schema) {
+          return false;
+        }
+      } else if (!schema.equals(other.schema)) {
+        return false;
+      }
+
+      if (null == transactionIsolation) {
+        if (null != other.transactionIsolation) {
+          return false;
+        }
+      } else if (!transactionIsolation.equals(other.transactionIsolation)) {
+        return false;
+      }
+
+      return true;
+    }
+
+    return false;
+  }
+
+  public Common.ConnectionProperties toProto() {
+    Common.ConnectionProperties.Builder builder = Common.ConnectionProperties.newBuilder();
+
+    if (null != autoCommit) {
+      builder.setHasAutoCommit(true);
+      builder.setAutoCommit(autoCommit.booleanValue());
+    } else {
+      // Be explicit to avoid default value confusion
+      builder.setHasAutoCommit(false);
+    }
+
+    if (null != catalog) {
+      builder.setCatalog(catalog);
+    }
+
+    builder.setIsDirty(isDirty);
+
+    if (null != readOnly) {
+      builder.setHasReadOnly(true);
+      builder.setReadOnly(readOnly.booleanValue());
+    } else {
+      // Be explicit to avoid default value confusion
+      builder.setHasReadOnly(false);
+    }
+
+    if (null != schema) {
+      builder.setSchema(schema);
+    }
+
+    if (null != transactionIsolation) {
+      builder.setTransactionIsolation(transactionIsolation.intValue());
+    }
+
+    return builder.build();
+  }
+
+  public static ConnectionPropertiesImpl fromProto(Common.ConnectionProperties proto) {
+    final Descriptor desc = proto.getDescriptorForType();
+
+    String catalog = null;
+    if (ProtobufService.hasField(proto, desc, Common.ConnectionProperties.CATALOG_FIELD_NUMBER)) {
+      catalog = proto.getCatalog();
+    }
+
+    String schema = null;
+    if (ProtobufService.hasField(proto, desc, Common.ConnectionProperties.SCHEMA_FIELD_NUMBER)) {
+      schema = proto.getSchema();
+    }
+
+    Boolean autoCommit = null;
+    if (proto.getHasAutoCommit()) {
+      autoCommit = Boolean.valueOf(proto.getAutoCommit());
+    }
+
+    Boolean readOnly = null;
+    if (proto.getHasReadOnly()) {
+      readOnly = Boolean.valueOf(proto.getReadOnly());
+    }
+
+    Integer transactionIsolation = null;
+    if (ProtobufService.hasField(proto, desc,
+        Common.ConnectionProperties.TRANSACTION_ISOLATION_FIELD_NUMBER)) {
+      transactionIsolation = Integer.valueOf(proto.getTransactionIsolation());
+    }
+
+    ConnectionPropertiesImpl impl = new ConnectionPropertiesImpl(autoCommit, readOnly,
+        transactionIsolation, catalog, schema);
+
+    impl.setDirty(proto.getIsDirty());
+
+    return impl;
+  }
 }
 
 // End ConnectionPropertiesImpl.java