You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by el...@apache.org on 2016/03/07 19:28:04 UTC

[26/59] [partial] calcite git commit: [CALCITE-1078] Detach avatica from the core calcite Maven project

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java
new file mode 100644
index 0000000..ff27d05
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Statement;
+import java.util.Objects;
+
+/**
+ * All we know about a statement. Encapsulates a {@link ResultSet}.
+ */
+public class StatementInfo {
+  private volatile Boolean relativeSupported = null;
+
+  final Statement statement; // sometimes a PreparedStatement
+  private ResultSet resultSet;
+  private long position = 0;
+
+  // True when setResultSet(ResultSet) is called to let us determine the difference between
+  // a null ResultSet (from an update) from the lack of a ResultSet.
+  private boolean resultsInitialized = false;
+
+  public StatementInfo(Statement statement) {
+    this.statement = Objects.requireNonNull(statement);
+  }
+
+  // Visible for testing
+  void setPosition(long position) {
+    this.position = position;
+  }
+
+  // Visible for testing
+  long getPosition() {
+    return this.position;
+  }
+
+  /**
+   * Set a ResultSet on this object.
+   *
+   * @param resultSet The current ResultSet
+   */
+  public void setResultSet(ResultSet resultSet) {
+    resultsInitialized = true;
+    this.resultSet = resultSet;
+  }
+
+  /**
+   * @return The {@link ResultSet} for this Statement, may be null.
+   */
+  public ResultSet getResultSet() {
+    return this.resultSet;
+  }
+
+  /**
+   * @return True if {@link #setResultSet(ResultSet)} was ever invoked.
+   */
+  public boolean isResultSetInitialized() {
+    return resultsInitialized;
+  }
+
+  /**
+   * @see ResultSet#next()
+   */
+  public boolean next() throws SQLException {
+    return _next(resultSet);
+  }
+
+  boolean _next(ResultSet results) throws SQLException {
+    boolean ret = results.next();
+    position++;
+    return ret;
+  }
+
+  /**
+   * Consumes <code>offset - position</code> elements from the {@link ResultSet}.
+   *
+   * @param offset The offset to advance to
+   * @return True if the resultSet was advanced to the current point, false if insufficient rows
+   *      were present to advance to the requested offset.
+   */
+  public boolean advanceResultSetToOffset(ResultSet results, long offset) throws SQLException {
+    if (offset < 0 || offset < position) {
+      throw new IllegalArgumentException("Offset should be "
+        + " non-negative and not less than the current position. " + offset + ", " + position);
+    }
+    if (position >= offset) {
+      return true;
+    }
+
+    if (null == relativeSupported) {
+      Boolean moreResults = null;
+      synchronized (this) {
+        if (null == relativeSupported) {
+          try {
+            moreResults = advanceByRelative(results, offset);
+            relativeSupported = true;
+          } catch (SQLFeatureNotSupportedException e) {
+            relativeSupported = false;
+          }
+        }
+      }
+
+      if (null != moreResults) {
+        // We figured out whether or not relative is supported.
+        // Make sure we actually do the necessary work.
+        if (!relativeSupported) {
+          // We avoided calling advanceByNext in the synchronized block earlier.
+          moreResults = advanceByNext(results, offset);
+        }
+
+        return moreResults;
+      }
+
+      // Another thread updated the RELATIVE_SUPPORTED before we did, fall through.
+    }
+
+    if (relativeSupported) {
+      return advanceByRelative(results, offset);
+    } else {
+      return advanceByNext(results, offset);
+    }
+  }
+
+  private boolean advanceByRelative(ResultSet results, long offset) throws SQLException {
+    long diff = offset - position;
+    while (diff > Integer.MAX_VALUE) {
+      if (!results.relative(Integer.MAX_VALUE)) {
+        // Avoid updating position until relative succeeds.
+        position += Integer.MAX_VALUE;
+        return false;
+      }
+      // Avoid updating position until relative succeeds.
+      position += Integer.MAX_VALUE;
+      diff -= Integer.MAX_VALUE;
+    }
+    boolean ret = results.relative((int) diff);
+    // Make sure we only update the position after successfully calling relative(int).
+    position += diff;
+    return ret;
+  }
+
+  private boolean advanceByNext(ResultSet results, long offset) throws SQLException {
+    while (position < offset) {
+      // Advance while maintaining `position`
+      if (!_next(results)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+}
+
+// End StatementInfo.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java
new file mode 100644
index 0000000..8b8fb76
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Implements an Avatica provider on top of an existing JDBC data source. */
+package org.apache.calcite.avatica.jdbc;
+
+
+// End package-info.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
new file mode 100644
index 0000000..42b13c9
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+
+import org.eclipse.jetty.server.Handler;
+
+/**
+ * A custom interface that extends the Jetty interface to enable extra control within Avatica.
+ */
+public interface AvaticaHandler extends Handler {
+
+  void setServerRpcMetadata(RpcMetadataResponse metadata);
+
+}
+
+// End AvaticaHandler.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
new file mode 100644
index 0000000..34a9333
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.AvaticaUtils;
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+import org.apache.calcite.avatica.metrics.Timer;
+import org.apache.calcite.avatica.metrics.Timer.Context;
+import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem;
+import org.apache.calcite.avatica.remote.Handler.HandlerResponse;
+import org.apache.calcite.avatica.remote.JsonHandler;
+import org.apache.calcite.avatica.remote.Service;
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
+
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.calcite.avatica.remote.MetricsHelper.concat;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * Jetty handler that executes Avatica JSON request-responses.
+ */
+public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareAvaticaHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(AvaticaJsonHandler.class);
+
+  final Service service;
+  final JsonHandler jsonHandler;
+
+  final MetricsSystem metrics;
+  final Timer requestTimer;
+
+  final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer;
+
+  public AvaticaJsonHandler(Service service) {
+    this(service, NoopMetricsSystem.getInstance());
+  }
+
+  public AvaticaJsonHandler(Service service, MetricsSystem metrics) {
+    this.service = Objects.requireNonNull(service);
+    this.metrics = Objects.requireNonNull(metrics);
+    // Avatica doesn't have a Guava dependency
+    this.jsonHandler = new JsonHandler(service, this.metrics);
+
+    // Metrics
+    this.requestTimer = this.metrics.getTimer(
+        concat(AvaticaJsonHandler.class, MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME));
+
+    this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() {
+      @Override public UnsynchronizedBuffer initialValue() {
+        return new UnsynchronizedBuffer();
+      }
+    };
+  }
+
+  public void handle(String target, Request baseRequest,
+      HttpServletRequest request, HttpServletResponse response)
+      throws IOException, ServletException {
+    try (final Context ctx = requestTimer.start()) {
+      response.setContentType("application/json;charset=utf-8");
+      response.setStatus(HttpServletResponse.SC_OK);
+      if (request.getMethod().equals("POST")) {
+        // First look for a request in the header, then look in the body.
+        // The latter allows very large requests without hitting HTTP 413.
+        String rawRequest = request.getHeader("request");
+        if (rawRequest == null) {
+          // Avoid a new buffer creation for every HTTP request
+          final UnsynchronizedBuffer buffer = threadLocalBuffer.get();
+          try (ServletInputStream inputStream = request.getInputStream()) {
+            rawRequest = AvaticaUtils.readFully(inputStream, buffer);
+          } finally {
+            // Reset the offset into the buffer after we're done
+            buffer.reset();
+          }
+        }
+        final String jsonRequest =
+            new String(rawRequest.getBytes("ISO-8859-1"), "UTF-8");
+        LOG.trace("request: {}", jsonRequest);
+
+        final HandlerResponse<String> jsonResponse = jsonHandler.apply(jsonRequest);
+        LOG.trace("response: {}", jsonResponse);
+        baseRequest.setHandled(true);
+        // Set the status code and write out the response.
+        response.setStatus(jsonResponse.getStatusCode());
+        response.getWriter().println(jsonResponse.getResponse());
+      }
+    }
+  }
+
+  @Override public void setServerRpcMetadata(RpcMetadataResponse metadata) {
+    // Set the metadata for the normal service calls
+    service.setRpcMetadata(metadata);
+    // Also add it to the handler to include with exceptions
+    jsonHandler.setRpcMetadata(metadata);
+  }
+
+  @Override public MetricsSystem getMetrics() {
+    return metrics;
+  }
+}
+
+// End AvaticaJsonHandler.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
new file mode 100644
index 0000000..27e73de
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.AvaticaUtils;
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+import org.apache.calcite.avatica.metrics.Timer;
+import org.apache.calcite.avatica.metrics.Timer.Context;
+import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem;
+import org.apache.calcite.avatica.remote.Handler.HandlerResponse;
+import org.apache.calcite.avatica.remote.MetricsHelper;
+import org.apache.calcite.avatica.remote.ProtobufHandler;
+import org.apache.calcite.avatica.remote.ProtobufTranslation;
+import org.apache.calcite.avatica.remote.ProtobufTranslationImpl;
+import org.apache.calcite.avatica.remote.Service;
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
+
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * Jetty handler that executes Avatica JSON request-responses.
+ */
+public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAwareAvaticaHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(AvaticaJsonHandler.class);
+
+  private final Service service;
+  private final ProtobufHandler pbHandler;
+  private final ProtobufTranslation protobufTranslation;
+  private final MetricsSystem metrics;
+  private final Timer requestTimer;
+
+  final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer;
+
+  public AvaticaProtobufHandler(Service service) {
+    this(service, NoopMetricsSystem.getInstance());
+  }
+
+  public AvaticaProtobufHandler(Service service, MetricsSystem metrics) {
+    this.service = Objects.requireNonNull(service);
+    this.metrics = Objects.requireNonNull(metrics);
+
+    this.requestTimer = this.metrics.getTimer(
+        MetricsHelper.concat(AvaticaProtobufHandler.class,
+            MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME));
+
+    this.protobufTranslation = new ProtobufTranslationImpl();
+    this.pbHandler = new ProtobufHandler(service, protobufTranslation, metrics);
+
+    this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() {
+      @Override public UnsynchronizedBuffer initialValue() {
+        return new UnsynchronizedBuffer();
+      }
+    };
+  }
+
+  public void handle(String target, Request baseRequest,
+      HttpServletRequest request, HttpServletResponse response)
+      throws IOException, ServletException {
+    try (final Context ctx = this.requestTimer.start()) {
+      response.setContentType("application/octet-stream;charset=utf-8");
+      response.setStatus(HttpServletResponse.SC_OK);
+      if (request.getMethod().equals("POST")) {
+        byte[] requestBytes;
+        // Avoid a new buffer creation for every HTTP request
+        final UnsynchronizedBuffer buffer = threadLocalBuffer.get();
+        try (ServletInputStream inputStream = request.getInputStream()) {
+          requestBytes = AvaticaUtils.readFullyToBytes(inputStream, buffer);
+        } finally {
+          buffer.reset();
+        }
+
+        HandlerResponse<byte[]> handlerResponse = pbHandler.apply(requestBytes);
+
+        baseRequest.setHandled(true);
+        response.setStatus(handlerResponse.getStatusCode());
+        response.getOutputStream().write(handlerResponse.getResponse());
+      }
+    }
+  }
+
+  @Override public void setServerRpcMetadata(RpcMetadataResponse metadata) {
+    // Set the metadata for the normal service calls
+    service.setRpcMetadata(metadata);
+    // Also add it to the handler to include with exceptions
+    pbHandler.setRpcMetadata(metadata);
+  }
+
+  @Override public MetricsSystem getMetrics() {
+    return this.metrics;
+  }
+
+}
+
+// End AvaticaProtobufHandler.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java
new file mode 100644
index 0000000..a574985
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * An AvaticaHandler implementation that delegates to a provided Jetty Handler instance.
+ *
+ * <p>This implementation provides a no-op implementation for
+ * {@link #setServerRpcMetadata(org.apache.calcite.avatica.remote.Service.RpcMetadataResponse)}.
+ *
+ * Does not implement {@link MetricsAwareAvaticaHandler} as this implementation is only presented
+ * for backwards compatibility.
+ */
+public class DelegatingAvaticaHandler implements AvaticaHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(DelegatingAvaticaHandler.class);
+
+  private final Handler handler;
+
+  public DelegatingAvaticaHandler(Handler handler) {
+    this.handler = Objects.requireNonNull(handler);
+  }
+
+  @Override public void handle(String target, Request baseRequest, HttpServletRequest request,
+      HttpServletResponse response) throws IOException, ServletException {
+    handler.handle(target, baseRequest, request, response);
+  }
+
+  @Override public void setServer(Server server) {
+    handler.setServer(server);
+  }
+
+  @Override public Server getServer() {
+    return handler.getServer();
+  }
+
+  @Override public void destroy() {
+    handler.destroy();
+  }
+
+  @Override public void start() throws Exception {
+    handler.start();
+  }
+
+  @Override public void stop() throws Exception {
+    handler.stop();
+  }
+
+  @Override public boolean isRunning() {
+    return handler.isRunning();
+  }
+
+  @Override public boolean isStarted() {
+    return handler.isStarted();
+  }
+
+  @Override public boolean isStarting() {
+    return handler.isStarting();
+  }
+
+  @Override public boolean isStopping() {
+    return handler.isStopping();
+  }
+
+  @Override public boolean isStopped() {
+    return handler.isStopped();
+  }
+
+  @Override public boolean isFailed() {
+    return handler.isFailed();
+  }
+
+  @Override public void addLifeCycleListener(Listener listener) {
+    handler.addLifeCycleListener(listener);
+  }
+
+  @Override public void removeLifeCycleListener(Listener listener) {
+    handler.removeLifeCycleListener(listener);
+  }
+
+  @Override public void setServerRpcMetadata(RpcMetadataResponse metadata) {
+    LOG.warn("Setting RpcMetadata is not implemented for DelegatingAvaticaHandler");
+  }
+
+}
+
+// End DelegatingAvaticaHandler.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java
new file mode 100644
index 0000000..b1fcb40
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+import org.apache.calcite.avatica.metrics.MetricsSystemConfiguration;
+import org.apache.calcite.avatica.metrics.MetricsSystemFactory;
+import org.apache.calcite.avatica.metrics.MetricsSystemLoader;
+import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem;
+import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystemConfiguration;
+import org.apache.calcite.avatica.remote.Driver;
+import org.apache.calcite.avatica.remote.Service;
+
+import org.eclipse.jetty.server.Handler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.ServiceLoader;
+
+/**
+ * Factory that instantiates the desired implementation, typically differing on the method
+ * used to serialize messages, for use in the Avatica server.
+ */
+public class HandlerFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(HandlerFactory.class);
+
+  /**
+   * Constructs the desired implementation for the given serialization method with metrics.
+   *
+   * @param service The underlying {@link Service}.
+   * @param serialization The desired message serialization.
+   * @return The {@link Handler}.
+   */
+  public Handler getHandler(Service service, Driver.Serialization serialization) {
+    return getHandler(service, serialization, NoopMetricsSystemConfiguration.getInstance());
+  }
+
+  /**
+   * Constructs the desired implementation for the given serialization method with metrics.
+   *
+   * @param service The underlying {@link Service}.
+   * @param serialization The desired message serialization.
+   * @param metricsConfig Configuration for the {@link MetricsSystem}.
+   * @return The {@link Handler}.
+   */
+  public Handler getHandler(Service service, Driver.Serialization serialization,
+      MetricsSystemConfiguration<?> metricsConfig) {
+    MetricsSystem metrics = MetricsSystemLoader.load(Objects.requireNonNull(metricsConfig));
+
+    switch (serialization) {
+    case JSON:
+      return new AvaticaJsonHandler(service, metrics);
+    case PROTOBUF:
+      return new AvaticaProtobufHandler(service, metrics);
+    default:
+      throw new IllegalArgumentException("Unknown Avatica handler for " + serialization.name());
+    }
+  }
+
+  /**
+   * Load a {@link MetricsSystem} using ServiceLoader to create a {@link MetricsSystemFactory}.
+   *
+   * @param config State to pass to the factory for initialization.
+   * @return A {@link MetricsSystem} instance.
+   */
+  MetricsSystem loadMetricsSystem(MetricsSystemConfiguration<?> config) {
+    ServiceLoader<MetricsSystemFactory> loader = ServiceLoader.load(MetricsSystemFactory.class);
+    List<MetricsSystemFactory> availableFactories = new ArrayList<>();
+    for (MetricsSystemFactory factory : loader) {
+      availableFactories.add(factory);
+    }
+
+    if (1 == availableFactories.size()) {
+      // One and only one instance -- what we want
+      MetricsSystemFactory factory = availableFactories.get(0);
+      LOG.info("Loaded MetricsSystem {}", factory.getClass());
+      return factory.create(config);
+    } else if (availableFactories.isEmpty()) {
+      // None-provided default to no metrics
+      LOG.info("No metrics implementation available on classpath. Using No-op implementation");
+      return NoopMetricsSystem.getInstance();
+    } else {
+      // Tell the user they're doing something wrong, and choose the first impl.
+      StringBuilder sb = new StringBuilder();
+      for (MetricsSystemFactory factory : availableFactories) {
+        if (sb.length() > 0) {
+          sb.append(", ");
+        }
+        sb.append(factory.getClass());
+      }
+      LOG.warn("Found multiple MetricsSystemFactory implementations: {}."
+          + " Using No-op implementation", sb);
+      return NoopMetricsSystem.getInstance();
+    }
+  }
+}
+
+// End HandlerFactory.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
new file mode 100644
index 0000000..c81e899
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.DefaultHandler;
+import org.eclipse.jetty.server.handler.HandlerList;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Avatica HTTP server.
+ *
+ * <p>If you need to change the server's configuration, override the
+ * {@link #configureConnector(ServerConnector, int)} method in a derived class.
+ */
+public class HttpServer {
+  private static final Logger LOG = LoggerFactory.getLogger(HttpServer.class);
+
+  private Server server;
+  private int port = -1;
+  private final AvaticaHandler handler;
+
+  @Deprecated
+  public HttpServer(Handler handler) {
+    this(wrapJettyHandler(handler));
+  }
+
+  public HttpServer(AvaticaHandler handler) {
+    this(0, handler);
+  }
+
+  @Deprecated
+  public HttpServer(int port, Handler handler) {
+    this(port, wrapJettyHandler(handler));
+  }
+
+  public HttpServer(int port, AvaticaHandler handler) {
+    this.port = port;
+    this.handler = handler;
+  }
+
+  private static AvaticaHandler wrapJettyHandler(Handler handler) {
+    if (handler instanceof AvaticaHandler) {
+      return (AvaticaHandler) handler;
+    }
+    // Backwards compatibility, noop's the AvaticaHandler interface
+    return new DelegatingAvaticaHandler(handler);
+  }
+
+  public void start() {
+    if (server != null) {
+      throw new RuntimeException("Server is already started");
+    }
+
+    final QueuedThreadPool threadPool = new QueuedThreadPool();
+    threadPool.setDaemon(true);
+    server = new Server(threadPool);
+    server.manage(threadPool);
+
+    final ServerConnector connector = configureConnector(new ServerConnector(server), port);
+
+    server.setConnectors(new Connector[] { connector });
+
+    final HandlerList handlerList = new HandlerList();
+    handlerList.setHandlers(new Handler[] { handler, new DefaultHandler() });
+    server.setHandler(handlerList);
+    try {
+      server.start();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    port = connector.getLocalPort();
+
+    LOG.info("Service listening on port {}.", getPort());
+
+    // Set the information about the address for this server
+    try {
+      this.handler.setServerRpcMetadata(createRpcServerMetadata(connector));
+    } catch (UnknownHostException e) {
+      // Failed to do the DNS lookup, bail out.
+      throw new RuntimeException(e);
+    }
+  }
+
+  private RpcMetadataResponse createRpcServerMetadata(ServerConnector connector) throws
+      UnknownHostException {
+    String host = connector.getHost();
+    if (null == host) {
+      // "null" means binding to all interfaces, we need to pick one so the client gets a real
+      // address and not "0.0.0.0" or similar.
+      host = InetAddress.getLocalHost().getHostName();
+    }
+
+    final int port = connector.getLocalPort();
+
+    return new RpcMetadataResponse(String.format("%s:%d", host, port));
+  }
+
+  /**
+   * Configures the server connector.
+   *
+   * <p>The default configuration sets a timeout of 1 minute and disables
+   * TCP linger time.
+   *
+   * <p>To change the configuration, override this method in a derived class.
+   * The overriding method must call its super method.
+   *
+   * @param connector connector to be configured
+   * @param port port number handed over in constructor
+   */
+  protected ServerConnector configureConnector(ServerConnector connector, int port) {
+    connector.setIdleTimeout(60 * 1000);
+    connector.setSoLingerTime(-1);
+    connector.setPort(port);
+    return connector;
+  }
+
+  public void stop() {
+    if (server == null) {
+      throw new RuntimeException("Server is already stopped");
+    }
+
+    LOG.info("Service terminating.");
+    try {
+      final Server server1 = server;
+      port = -1;
+      server = null;
+      server1.stop();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void join() throws InterruptedException {
+    server.join();
+  }
+
+  public int getPort() {
+    return port;
+  }
+}
+
+// End HttpServer.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/Main.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/Main.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/Main.java
new file mode 100644
index 0000000..8b05931
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/Main.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.remote.LocalService;
+import org.apache.calcite.avatica.remote.Service;
+
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import java.util.Arrays;
+
+/**
+ * Jetty handler that executes Avatica JSON request-responses.
+ */
+public class Main {
+  private Main() {}
+
+  public static void main(String[] args)
+      throws InterruptedException, ClassNotFoundException,
+      IllegalAccessException, InstantiationException {
+    HttpServer server = start(args);
+    server.join();
+  }
+
+  /**
+   * Factory that instantiates Jetty Handlers
+   */
+  public interface HandlerFactory {
+    AbstractHandler createHandler(Service service);
+  }
+
+  private static final HandlerFactory JSON_HANDLER_FACTORY = new HandlerFactory() {
+    public AbstractHandler createHandler(Service service) {
+      return new AvaticaJsonHandler(service);
+    }
+  };
+
+  /**
+   * Creates and starts an {@link HttpServer} using JSON POJO serialization of requests/responses.
+   *
+   * <p>Arguments are as follows:
+   * <ul>
+   *   <li>args[0]: the {@link org.apache.calcite.avatica.Meta.Factory} class
+   *       name
+   *   <li>args[1+]: arguments passed along to
+   *   {@link org.apache.calcite.avatica.Meta.Factory#create(java.util.List)}
+   * </ul>
+   *
+   * @param args Command-line arguments
+   */
+  public static HttpServer start(String[] args) throws ClassNotFoundException,
+         InstantiationException, IllegalAccessException {
+    return start(args, 8765, JSON_HANDLER_FACTORY);
+  }
+
+  /**
+   * Creates and starts an {@link HttpServer} using the given factory to create the Handler.
+   *
+   * <p>Arguments are as follows:
+   * <ul>
+   *   <li>args[0]: the {@link org.apache.calcite.avatica.Meta.Factory} class
+   *       name
+   *   <li>args[1+]: arguments passed along to
+   *   {@link org.apache.calcite.avatica.Meta.Factory#create(java.util.List)}
+   * </ul>
+   *
+   * @param args Command-line arguments
+   * @param port Server port to bind
+   * @param handlerFactory Factory to create the handler used by the server
+   */
+  public static HttpServer start(String[] args, int port, HandlerFactory handlerFactory)
+      throws ClassNotFoundException, InstantiationException,
+      IllegalAccessException {
+    String factoryClassName = args[0];
+    Class<?> factoryClass = Class.forName(factoryClassName);
+    Meta.Factory factory = (Meta.Factory) factoryClass.newInstance();
+    Meta meta = factory.create(Arrays.asList(args).subList(1, args.length));
+    Service service = new LocalService(meta);
+    HttpServer server = new HttpServer(port, handlerFactory.createHandler(service));
+    server.start();
+    return server;
+  }
+}
+
+// End Main.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java
new file mode 100644
index 0000000..0914dbd
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+
+/**
+ * An {@link AvaticaHandler} that is capable of collecting metrics.
+ */
+public interface MetricsAwareAvaticaHandler extends AvaticaHandler {
+
+  /**
+   * General prefix for all metrics in a handler.
+   */
+  String HANDLER_PREFIX = "Handler.";
+
+  /**
+   * Name for timing requests from users
+   */
+  String REQUEST_TIMER_NAME = HANDLER_PREFIX + "RequestTimings";
+
+  /**
+   * @return An instance of the {@link MetricsSystem} for this AvaticaHandler.
+   */
+  MetricsSystem getMetrics();
+
+}
+
+// End MetricsAwareAvaticaHandler.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/package-info.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/package-info.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/package-info.java
new file mode 100644
index 0000000..f2b8728
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Avatica server that listens for HTTP requests.
+ */
+@PackageMarker
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/test/java/org/apache/calcite/avatica/ConnectionSpec.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/test/java/org/apache/calcite/avatica/ConnectionSpec.java b/avatica/server/src/test/java/org/apache/calcite/avatica/ConnectionSpec.java
new file mode 100644
index 0000000..ba4c5b8
--- /dev/null
+++ b/avatica/server/src/test/java/org/apache/calcite/avatica/ConnectionSpec.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica;
+
+import net.hydromatic.scott.data.hsqldb.ScottHsqldb;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Information necessary to create a JDBC connection. Specify one to run
+ * tests against a different database. (hsqldb is the default.) */
+public class ConnectionSpec {
+  public final String url;
+  public final String username;
+  public final String password;
+  public final String driver;
+
+  // CALCITE-687 HSQLDB seems to fail oddly when multiple tests are run concurrently
+  private static final ReentrantLock HSQLDB_LOCK = new ReentrantLock();
+
+  public ConnectionSpec(String url, String username, String password,
+      String driver) {
+    this.url = url;
+    this.username = username;
+    this.password = password;
+    this.driver = driver;
+  }
+
+  public static final ConnectionSpec HSQLDB =
+      new ConnectionSpec(ScottHsqldb.URI, ScottHsqldb.USER,
+          ScottHsqldb.PASSWORD, "org.hsqldb.jdbcDriver");
+
+  /**
+   * Return a lock used for controlling concurrent access to the database as it has been observed
+   * that concurrent access is causing problems with HSQLDB.
+   */
+  public static ReentrantLock getDatabaseLock() {
+    return HSQLDB_LOCK;
+  }
+}
+
+// End ConnectionSpec.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java b/avatica/server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java
new file mode 100644
index 0000000..9749ef6
--- /dev/null
+++ b/avatica/server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica;
+
+import org.apache.calcite.avatica.remote.MockJsonService;
+import org.apache.calcite.avatica.remote.MockProtobufService.MockProtobufServiceFactory;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * RemoteDriver tests that use a Mock implementation of a Connection.
+ */
+@RunWith(Parameterized.class)
+public class RemoteDriverMockTest {
+  public static final String MJS = MockJsonService.Factory.class.getName();
+  public static final String MPBS = MockProtobufServiceFactory.class.getName();
+
+  private static Connection mjs() throws SQLException {
+    return DriverManager.getConnection("jdbc:avatica:remote:factory=" + MJS);
+  }
+
+  private static Connection mpbs() throws SQLException {
+    return DriverManager.getConnection("jdbc:avatica:remote:factory=" + MPBS);
+  }
+
+  @Parameters
+  public static List<Object[]> parameters() {
+    List<Object[]> parameters = new ArrayList<>();
+
+    parameters.add(new Object[] {new Callable<Connection>() {
+      public Connection call() throws SQLException {
+        return mjs();
+      }
+    } });
+
+    parameters.add(new Object[] {new Callable<Connection>() {
+      public Connection call() throws SQLException {
+        return mpbs();
+      }
+    } });
+
+    return parameters;
+  }
+
+  private final Callable<Connection> connectionFunctor;
+
+  public RemoteDriverMockTest(Callable<Connection> functor) {
+    this.connectionFunctor = functor;
+  }
+
+  private Connection getMockConnection() {
+    try {
+      return connectionFunctor.call();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test public void testRegister() throws Exception {
+    final Connection connection = getMockConnection();
+    assertThat(connection.isClosed(), is(false));
+    connection.close();
+    assertThat(connection.isClosed(), is(true));
+  }
+
+  @Test public void testSchemas() throws Exception {
+    final Connection connection = getMockConnection();
+    final ResultSet resultSet =
+        connection.getMetaData().getSchemas(null, null);
+    assertFalse(resultSet.next());
+    final ResultSetMetaData metaData = resultSet.getMetaData();
+    assertTrue(metaData.getColumnCount() >= 2);
+    assertEquals("TABLE_CATALOG", metaData.getColumnName(1));
+    assertEquals("TABLE_SCHEM", metaData.getColumnName(2));
+    resultSet.close();
+    connection.close();
+  }
+
+  @Test public void testTables() throws Exception {
+    final Connection connection = getMockConnection();
+    final ResultSet resultSet =
+        connection.getMetaData().getTables(null, null, null, new String[0]);
+    assertFalse(resultSet.next());
+    final ResultSetMetaData metaData = resultSet.getMetaData();
+    assertTrue(metaData.getColumnCount() >= 3);
+    assertEquals("TABLE_CAT", metaData.getColumnName(1));
+    assertEquals("TABLE_SCHEM", metaData.getColumnName(2));
+    assertEquals("TABLE_NAME", metaData.getColumnName(3));
+    resultSet.close();
+    connection.close();
+  }
+
+  @Ignore
+  @Test public void testNoFactory() throws Exception {
+    final Connection connection =
+        DriverManager.getConnection("jdbc:avatica:remote:");
+    assertThat(connection.isClosed(), is(false));
+    final ResultSet resultSet = connection.getMetaData().getSchemas();
+    assertFalse(resultSet.next());
+    final ResultSetMetaData metaData = resultSet.getMetaData();
+    assertEquals(2, metaData.getColumnCount());
+    assertEquals("TABLE_SCHEM", metaData.getColumnName(1));
+    assertEquals("TABLE_CATALOG", metaData.getColumnName(2));
+    resultSet.close();
+    connection.close();
+    assertThat(connection.isClosed(), is(true));
+  }
+
+  @Ignore
+  @Test public void testCatalogsMock() throws Exception {
+    final Connection connection = getMockConnection();
+    assertThat(connection.isClosed(), is(false));
+    final ResultSet resultSet = connection.getMetaData().getSchemas();
+    assertFalse(resultSet.next());
+    final ResultSetMetaData metaData = resultSet.getMetaData();
+    assertEquals(2, metaData.getColumnCount());
+    assertEquals("TABLE_SCHEM", metaData.getColumnName(1));
+    assertEquals("TABLE_CATALOG", metaData.getColumnName(2));
+    resultSet.close();
+    connection.close();
+    assertThat(connection.isClosed(), is(true));
+  }
+
+  @Ignore
+  @Test public void testStatementExecuteQueryMock() throws Exception {
+    checkStatementExecuteQuery(getMockConnection(), false);
+  }
+
+  @Ignore
+  @Test public void testPrepareExecuteQueryMock() throws Exception {
+    checkStatementExecuteQuery(getMockConnection(), true);
+  }
+
+  private void checkStatementExecuteQuery(Connection connection,
+      boolean prepare) throws SQLException {
+    final String sql = "select * from (\n"
+        + "  values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)";
+    final Statement statement;
+    final ResultSet resultSet;
+    final ParameterMetaData parameterMetaData;
+    if (prepare) {
+      final PreparedStatement ps = connection.prepareStatement(sql);
+      statement = ps;
+      parameterMetaData = ps.getParameterMetaData();
+      resultSet = ps.executeQuery();
+    } else {
+      statement = connection.createStatement();
+      parameterMetaData = null;
+      resultSet = statement.executeQuery(sql);
+    }
+    if (parameterMetaData != null) {
+      assertThat(parameterMetaData.getParameterCount(), equalTo(0));
+    }
+    final ResultSetMetaData metaData = resultSet.getMetaData();
+    assertEquals(2, metaData.getColumnCount());
+    assertEquals("C1", metaData.getColumnName(1));
+    assertEquals("C2", metaData.getColumnName(2));
+    assertTrue(resultSet.next());
+    assertTrue(resultSet.next());
+    assertTrue(resultSet.next());
+    assertFalse(resultSet.next());
+    resultSet.close();
+    statement.close();
+    connection.close();
+  }
+
+  @Test public void testResultSetsFinagled() throws Exception {
+    // These values specified in MockJsonService
+    final String table = "my_table";
+    final long value = 10;
+
+    final Connection connection = getMockConnection();
+    // Not an accurate ResultSet per JDBC, but close enough for testing.
+    ResultSet results = connection.getMetaData().getColumns(null, null, table, null);
+    assertTrue(results.next());
+    assertEquals(table, results.getString(1));
+    assertEquals(value, results.getLong(2));
+  }
+
+}
+
+// End RemoteDriverMockTest.java