You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by el...@apache.org on 2016/03/07 19:28:15 UTC

[37/59] [partial] calcite git commit: [CALCITE-1078] Detach avatica from the core calcite Maven project

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractHandler.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractHandler.java
new file mode 100644
index 0000000..1968fcd
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractHandler.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaSeverity;
+import org.apache.calcite.avatica.NoSuchConnectionException;
+import org.apache.calcite.avatica.remote.Service.ErrorResponse;
+import org.apache.calcite.avatica.remote.Service.Request;
+import org.apache.calcite.avatica.remote.Service.Response;
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+
+import java.io.IOException;
+
+/**
+ * Abstract base class for {@link Handler}s to extend to inherit functionality common across
+ * serialization strategies.
+ *
+ * @param <T> The format Requests/Responses are serialized as.
+ */
+public abstract class AbstractHandler<T> implements Handler<T> {
+  private static final String NULL_EXCEPTION_MESSAGE = "(null exception message)";
+  protected final Service service;
+  private RpcMetadataResponse metadata = null;
+
+  public AbstractHandler(Service service) {
+    this.service = service;
+  }
+
+  abstract Request decode(T serializedRequest) throws IOException;
+
+  /**
+   * Serialize the given {@link Response} per the concrete {@link Handler} implementation.
+   *
+   * @param response The {@link Response} to serialize.
+   * @return A serialized representation of the {@link Response}.
+   * @throws IOException
+   */
+  abstract T encode(Response response) throws IOException;
+
+  /**
+   * Unwrap Avatica-specific context about a given exception.
+   *
+   * @param e A caught exception throw by Avatica implementation.
+   * @return An {@link ErrorResponse}.
+   */
+  ErrorResponse unwrapException(Exception e) {
+    // By default, we know nothing extra.
+    int errorCode = ErrorResponse.UNKNOWN_ERROR_CODE;
+    String sqlState = ErrorResponse.UNKNOWN_SQL_STATE;
+    AvaticaSeverity severity = AvaticaSeverity.UNKNOWN;
+    String errorMsg = null;
+
+    // Extract the contextual information if we have it. We may not.
+    if (e instanceof AvaticaRuntimeException) {
+      AvaticaRuntimeException rte = (AvaticaRuntimeException) e;
+      errorCode = rte.getErrorCode();
+      sqlState = rte.getSqlState();
+      severity = rte.getSeverity();
+      errorMsg = rte.getErrorMessage();
+    } else if (e instanceof NoSuchConnectionException) {
+      errorCode = ErrorResponse.MISSING_CONNECTION_ERROR_CODE;
+      severity = AvaticaSeverity.ERROR;
+      errorMsg = e.getMessage();
+    } else {
+      // Try to construct a meaningful error message when the server impl doesn't provide one.
+      errorMsg = getCausalChain(e);
+    }
+
+    return new ErrorResponse(e, errorMsg, errorCode, sqlState, severity, metadata);
+  }
+
+  /**
+   * Compute a response for the given request, handling errors generated by that computation.
+   *
+   * @param serializedRequest The caller's request.
+   * @return A {@link Response} with additional context about that response.
+   */
+  public HandlerResponse<T> apply(T serializedRequest) {
+    final Service.Request request;
+    try {
+      request = decode(serializedRequest);
+    } catch (IOException e) {
+      // TODO provide a canned ErrorResponse.
+      throw new RuntimeException(e);
+    }
+
+    try {
+      final Service.Response response = request.accept(service);
+      return new HandlerResponse<>(encode(response), HTTP_OK);
+    } catch (Exception e) {
+      ErrorResponse errorResp = unwrapException(e);
+
+      try {
+        return new HandlerResponse<>(encode(errorResp), HTTP_INTERNAL_SERVER_ERROR);
+      } catch (IOException e1) {
+        // TODO provide a canned ErrorResponse
+
+        // If we can't serialize error message to JSON, can't give a meaningful error to caller.
+        // Just try to not unnecessarily create more exceptions.
+        if (e instanceof RuntimeException) {
+          throw (RuntimeException) e;
+        }
+
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Constructs a message for the summary of an Exception.
+   *
+   * @param e The Exception to summarize.
+   * @return A summary message for the Exception.
+   */
+  private String getCausalChain(Exception e) {
+    StringBuilder sb = new StringBuilder(16);
+    Throwable curr = e;
+    // Could use Guava, but that would increase dependency set unnecessarily.
+    while (null != curr) {
+      if (sb.length() > 0) {
+        sb.append(" -> ");
+      }
+      String message = curr.getMessage();
+      sb.append(curr.getClass().getSimpleName()).append(": ");
+      sb.append(null == message ? NULL_EXCEPTION_MESSAGE : message);
+      curr = curr.getCause();
+    }
+    if (sb.length() == 0) {
+      // Catch the case where we have no error message.
+      return "Unknown error message";
+    }
+    return sb.toString();
+  }
+
+  @Override public void setRpcMetadata(RpcMetadataResponse metadata) {
+    this.metadata = metadata;
+  }
+}
+
+// End AbstractHandler.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java
new file mode 100644
index 0000000..ffaa360
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.Meta;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A common base class for {@link Service} implementations that implement
+ * modifications made to response objects.
+ */
+public abstract class AbstractService implements Service {
+
+  private RpcMetadataResponse rpcMetadata = null;
+
+  /**
+   * Represents the serialization of the data over a transport.
+   */
+  enum SerializationType {
+    JSON,
+    PROTOBUF
+  }
+
+  /**
+   * @return The manner in which the data is serialized.
+   */
+  abstract SerializationType getSerializationType();
+
+  /** Modifies a signature, changing the representation of numeric columns
+   * within it. This deals with the fact that JSON transmits a small long value,
+   * or a float which is a whole number, as an integer. Thus the accessors need
+   * be prepared to accept any numeric type. */
+  Meta.Signature finagle(Meta.Signature signature) {
+    final List<ColumnMetaData> columns = new ArrayList<>();
+    for (ColumnMetaData column : signature.columns) {
+      columns.add(finagle(column));
+    }
+    if (columns.equals(signature.columns)) {
+      return signature;
+    }
+    return new Meta.Signature(columns, signature.sql,
+        signature.parameters, signature.internalParameters,
+        signature.cursorFactory, signature.statementType);
+  }
+
+  ColumnMetaData finagle(ColumnMetaData column) {
+    switch (column.type.rep) {
+    case BYTE:
+    case PRIMITIVE_BYTE:
+    case DOUBLE:
+    case PRIMITIVE_DOUBLE:
+    case FLOAT:
+    case PRIMITIVE_FLOAT:
+    case INTEGER:
+    case PRIMITIVE_INT:
+    case SHORT:
+    case PRIMITIVE_SHORT:
+    case LONG:
+    case PRIMITIVE_LONG:
+      return column.setRep(ColumnMetaData.Rep.NUMBER);
+    default:
+      // continue
+      break;
+    }
+    switch (column.type.id) {
+    case Types.VARBINARY:
+    case Types.BINARY:
+      switch (getSerializationType()) {
+      case JSON:
+        return column.setRep(ColumnMetaData.Rep.STRING);
+      case PROTOBUF:
+        return column;
+      default:
+        throw new IllegalStateException("Unhadled case statement");
+      }
+    case Types.DECIMAL:
+    case Types.NUMERIC:
+      return column.setRep(ColumnMetaData.Rep.NUMBER);
+    default:
+      return column;
+    }
+  }
+
+  PrepareResponse finagle(PrepareResponse response) {
+    final Meta.StatementHandle statement = finagle(response.statement);
+    if (statement == response.statement) {
+      return response;
+    }
+    return new PrepareResponse(statement, rpcMetadata);
+  }
+
+  Meta.StatementHandle finagle(Meta.StatementHandle h) {
+    final Meta.Signature signature = finagle(h.signature);
+    if (signature == h.signature) {
+      return h;
+    }
+    return new Meta.StatementHandle(h.connectionId, h.id, signature);
+  }
+
+  ResultSetResponse finagle(ResultSetResponse r) {
+    if (r.updateCount != -1) {
+      assert r.signature == null;
+      return r;
+    }
+    if (r.signature == null) {
+      return r;
+    }
+    final Meta.Signature signature = finagle(r.signature);
+    if (signature == r.signature) {
+      return r;
+    }
+    return new ResultSetResponse(r.connectionId, r.statementId, r.ownStatement,
+        signature, r.firstFrame, r.updateCount, rpcMetadata);
+  }
+
+  ExecuteResponse finagle(ExecuteResponse r) {
+    if (r.missingStatement) {
+      return r;
+    }
+    final List<ResultSetResponse> results = new ArrayList<>();
+    int changeCount = 0;
+    for (ResultSetResponse result : r.results) {
+      ResultSetResponse result2 = finagle(result);
+      if (result2 != result) {
+        ++changeCount;
+      }
+      results.add(result2);
+    }
+    if (changeCount == 0) {
+      return r;
+    }
+    return new ExecuteResponse(results, r.missingStatement, rpcMetadata);
+  }
+
+  @Override public void setRpcMetadata(RpcMetadataResponse metadata) {
+    // OK if this is null
+    this.rpcMetadata = metadata;
+  }
+}
+
+// End AbstractService.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
new file mode 100644
index 0000000..3bda248
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.http.ConnectionReuseStrategy;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.protocol.RequestExpectContinue;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.pool.BasicConnFactory;
+import org.apache.http.impl.pool.BasicConnPool;
+import org.apache.http.impl.pool.BasicPoolEntry;
+import org.apache.http.message.BasicHttpEntityEnclosingRequest;
+import org.apache.http.protocol.HttpCoreContext;
+import org.apache.http.protocol.HttpProcessor;
+import org.apache.http.protocol.HttpProcessorBuilder;
+import org.apache.http.protocol.HttpRequestExecutor;
+import org.apache.http.protocol.RequestConnControl;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestTargetHost;
+import org.apache.http.util.EntityUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.concurrent.Future;
+
+/**
+ * A common class to invoke HTTP requests against the Avatica server agnostic of the data being
+ * sent and received across the wire.
+ */
+public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient {
+  private static final Logger LOG = LoggerFactory.getLogger(AvaticaCommonsHttpClientImpl.class);
+  private static final ConnectionReuseStrategy REUSE = DefaultConnectionReuseStrategy.INSTANCE;
+
+  // Some basic exposed configurations
+  private static final String MAX_POOLED_CONNECTION_PER_ROUTE_KEY =
+      "avatica.pooled.connections.per.route";
+  private static final String MAX_POOLED_CONNECTION_PER_ROUTE_DEFAULT = "4";
+  private static final String MAX_POOLED_CONNECTIONS_KEY = "avatica.pooled.connections.max";
+  private static final String MAX_POOLED_CONNECTIONS_DEFAULT = "16";
+
+  protected final HttpHost host;
+  protected final HttpProcessor httpProcessor;
+  protected final HttpRequestExecutor httpExecutor;
+  protected final BasicConnPool httpPool;
+
+  public AvaticaCommonsHttpClientImpl(URL url) {
+    this.host = new HttpHost(url.getHost(), url.getPort(), url.getProtocol());
+
+    this.httpProcessor = HttpProcessorBuilder.create()
+        .add(new RequestContent())
+        .add(new RequestTargetHost())
+        .add(new RequestConnControl())
+        .add(new RequestExpectContinue()).build();
+
+    this.httpExecutor = new HttpRequestExecutor();
+
+    this.httpPool = new BasicConnPool(new BasicConnFactory());
+    int maxPerRoute = Integer.parseInt(
+        System.getProperty(MAX_POOLED_CONNECTION_PER_ROUTE_KEY,
+            MAX_POOLED_CONNECTION_PER_ROUTE_DEFAULT));
+    int maxTotal = Integer.parseInt(
+        System.getProperty(MAX_POOLED_CONNECTIONS_KEY,
+            MAX_POOLED_CONNECTIONS_DEFAULT));
+    httpPool.setDefaultMaxPerRoute(maxPerRoute);
+    httpPool.setMaxTotal(maxTotal);
+  }
+
+  public byte[] send(byte[] request) {
+    while (true) {
+      boolean reusable = false;
+      // Get a connection from the pool
+      Future<BasicPoolEntry> future = this.httpPool.lease(host, null);
+      BasicPoolEntry entry = null;
+      try {
+        entry = future.get();
+        HttpCoreContext coreContext = HttpCoreContext.create();
+        coreContext.setTargetHost(host);
+
+        HttpClientConnection conn = entry.getConnection();
+
+        ByteArrayEntity entity = new ByteArrayEntity(request, ContentType.APPLICATION_OCTET_STREAM);
+
+        BasicHttpEntityEnclosingRequest postRequest =
+            new BasicHttpEntityEnclosingRequest("POST", "/");
+        postRequest.setEntity(entity);
+
+        httpExecutor.preProcess(postRequest, httpProcessor, coreContext);
+        HttpResponse response = httpExecutor.execute(postRequest, conn, coreContext);
+        httpExecutor.postProcess(response, httpProcessor, coreContext);
+
+        // Should the connection be kept alive?
+        reusable = REUSE.keepAlive(response, coreContext);
+
+        final int statusCode = response.getStatusLine().getStatusCode();
+        if (HttpURLConnection.HTTP_UNAVAILABLE == statusCode) {
+          // Could be sitting behind a load-balancer, try again.
+          continue;
+        }
+
+        return EntityUtils.toByteArray(response.getEntity());
+      } catch (Exception e) {
+        LOG.debug("Failed to execute HTTP request", e);
+        throw new RuntimeException(e);
+      } finally {
+        // Release the connection back to the pool, marking if it's good to reuse or not.
+        httpPool.release(entry, reusable);
+      }
+    }
+  }
+}
+
+// End AvaticaCommonsHttpClientImpl.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClient.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClient.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClient.java
new file mode 100644
index 0000000..eac1b74
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClient.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+/**
+ * An interface which defines how requests are sent to the Avatica server.
+ */
+public interface AvaticaHttpClient {
+
+  /**
+   * Sends a serialized request to the Avatica server.
+   *
+   * @param request The serialized request.
+   * @return The serialized response.
+   */
+  byte[] send(byte[] request);
+
+}
+
+// End AvaticaHttpClient.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java
new file mode 100644
index 0000000..b5d213a
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ConnectionConfig;
+
+import java.net.URL;
+
+/**
+ * A factory for constructing {@link AvaticaHttpClient}'s.
+ */
+public interface AvaticaHttpClientFactory {
+
+  /**
+   * Construct the appropriate implementation of {@link AvaticaHttpClient}.
+   *
+   * @param url URL that the client is for.
+   * @param config Configuration to use when constructing the implementation.
+   * @return An instance of {@link AvaticaHttpClient}.
+   */
+  AvaticaHttpClient getClient(URL url, ConnectionConfig config);
+
+}
+
+// End AvaticaHttpClientFactory.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java
new file mode 100644
index 0000000..cd6cbce
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ConnectionConfig;
+
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.util.Objects;
+
+/**
+ * Default implementation of {@link AvaticaHttpClientFactory} which chooses an implementation
+ * from a property.
+ */
+public class AvaticaHttpClientFactoryImpl implements AvaticaHttpClientFactory {
+  public static final String HTTP_CLIENT_IMPL_DEFAULT =
+      AvaticaCommonsHttpClientImpl.class.getName();
+
+  // Public for Type.PLUGIN
+  public static final AvaticaHttpClientFactoryImpl INSTANCE = new AvaticaHttpClientFactoryImpl();
+
+  // Public for Type.PLUGIN
+  public AvaticaHttpClientFactoryImpl() {}
+
+  /**
+   * Returns a singleton instance of {@link AvaticaHttpClientFactoryImpl}.
+   *
+   * @return A singleton instance.
+   */
+  public static AvaticaHttpClientFactoryImpl getInstance() {
+    return INSTANCE;
+  }
+
+  @Override public AvaticaHttpClient getClient(URL url, ConnectionConfig config) {
+    String className = config.httpClientClass();
+    if (null == className) {
+      className = HTTP_CLIENT_IMPL_DEFAULT;
+    }
+
+    try {
+      Class<?> clz = Class.forName(className);
+      Constructor<?> constructor = clz.getConstructor(URL.class);
+      Object instance = constructor.newInstance(Objects.requireNonNull(url));
+      return AvaticaHttpClient.class.cast(instance);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to construct AvaticaHttpClient implementation "
+          + className, e);
+    }
+  }
+}
+
+// End AvaticaHttpClientFactoryImpl.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientImpl.java
new file mode 100644
index 0000000..c100eec
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+/**
+ * A common class to invoke HTTP requests against the Avatica server agnostic of the data being
+ * sent and received across the wire.
+ */
+public class AvaticaHttpClientImpl implements AvaticaHttpClient {
+  protected final URL url;
+
+  public AvaticaHttpClientImpl(URL url) {
+    this.url = url;
+  }
+
+  public byte[] send(byte[] request) {
+    // TODO back-off policy?
+    while (true) {
+      try {
+        final HttpURLConnection connection = openConnection();
+        connection.setRequestMethod("POST");
+        connection.setDoInput(true);
+        connection.setDoOutput(true);
+        try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream())) {
+          wr.write(request);
+          wr.flush();
+          wr.close();
+        }
+        final int responseCode = connection.getResponseCode();
+        final InputStream inputStream;
+        if (responseCode == HttpURLConnection.HTTP_UNAVAILABLE) {
+          // Could be sitting behind a load-balancer, try again.
+          continue;
+        } else if (responseCode != HttpURLConnection.HTTP_OK) {
+          inputStream = connection.getErrorStream();
+        } else {
+          inputStream = connection.getInputStream();
+        }
+        return AvaticaUtils.readFullyToBytes(inputStream);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  HttpURLConnection openConnection() throws IOException {
+    return (HttpURLConnection) url.openConnection();
+  }
+}
+
+// End AvaticaHttpClientImpl.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionConfigImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionConfigImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionConfigImpl.java
new file mode 100644
index 0000000..d5ae9b1
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionConfigImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ConnectionConfigImpl;
+
+import java.util.Properties;
+
+/** Implementation of {@link org.apache.calcite.avatica.ConnectionConfig}
+ * with extra properties specific to Remote Driver. */
+public class AvaticaRemoteConnectionConfigImpl extends ConnectionConfigImpl {
+  public AvaticaRemoteConnectionConfigImpl(Properties properties) {
+    super(properties);
+  }
+
+  public Service.Factory factory() {
+    return AvaticaRemoteConnectionProperty.FACTORY.wrap(properties)
+        .getPlugin(Service.Factory.class, null);
+  }
+}
+
+// End AvaticaRemoteConnectionConfigImpl.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
new file mode 100644
index 0000000..5e755f8
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ConnectionProperty;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.calcite.avatica.ConnectionConfigImpl.PropEnv;
+import static org.apache.calcite.avatica.ConnectionConfigImpl.parse;
+
+/**
+ * Enumeration of Avatica remote driver's built-in connection properties.
+ */
+public enum AvaticaRemoteConnectionProperty implements ConnectionProperty {
+  /** Factory. */
+  FACTORY("factory", Type.STRING, null);
+
+  private final String camelName;
+  private final Type type;
+  private final Object defaultValue;
+
+  private static final Map<String, AvaticaRemoteConnectionProperty> NAME_TO_PROPS;
+
+  static {
+    NAME_TO_PROPS = new HashMap<String, AvaticaRemoteConnectionProperty>();
+    for (AvaticaRemoteConnectionProperty p
+        : AvaticaRemoteConnectionProperty.values()) {
+      NAME_TO_PROPS.put(p.camelName.toUpperCase(), p);
+      NAME_TO_PROPS.put(p.name(), p);
+    }
+  }
+
+  AvaticaRemoteConnectionProperty(String camelName,
+      Type type,
+      Object defaultValue) {
+    this.camelName = camelName;
+    this.type = type;
+    this.defaultValue = defaultValue;
+    assert defaultValue == null || type.valid(defaultValue);
+  }
+
+  public String camelName() {
+    return camelName;
+  }
+
+  public Object defaultValue() {
+    return defaultValue;
+  }
+
+  public Type type() {
+    return type;
+  }
+
+  public PropEnv wrap(Properties properties) {
+    return new PropEnv(parse(properties, NAME_TO_PROPS), this);
+  }
+
+  public boolean required() {
+    return false;
+  }
+}
+
+// End AvaticaRemoteConnectionProperty.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRuntimeException.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRuntimeException.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRuntimeException.java
new file mode 100644
index 0000000..2f9a1cd
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRuntimeException.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaSeverity;
+import org.apache.calcite.avatica.remote.Service.ErrorResponse;
+
+import java.util.Objects;
+
+/**
+ * A {@link RuntimeException} thrown by Avatica with additional contextual information about
+ * what happened to cause the Exception.
+ */
+public class AvaticaRuntimeException extends RuntimeException {
+  private static final long serialVersionUID = 1L;
+  private final String errorMessage;
+  private final int errorCode;
+  private final String sqlState;
+  private final AvaticaSeverity severity;
+
+  /**
+   * Constructs an {@code AvaticaRuntimeException} with no additional information.
+   *
+   * <p>It is strongly preferred that the caller invoke
+   * {@link #AvaticaRuntimeException(String, int, String, AvaticaSeverity)}
+   * with proper contextual information.
+   */
+  public AvaticaRuntimeException() {
+    this("No additional context on exception", ErrorResponse.UNKNOWN_ERROR_CODE,
+        ErrorResponse.UNKNOWN_SQL_STATE, AvaticaSeverity.UNKNOWN);
+  }
+
+  /**
+   * Constructs an {@code AvaticaRuntimeException} with the given
+   * contextual information surrounding the error.
+   *
+   * @param errorMessage A human-readable explanation about what happened
+   * @param errorCode Numeric identifier for error
+   * @param sqlState 5-character identifier for error
+   * @param severity Severity
+   */
+  public AvaticaRuntimeException(String errorMessage, int errorCode, String sqlState,
+      AvaticaSeverity severity) {
+    this.errorMessage = Objects.requireNonNull(errorMessage);
+    this.errorCode = errorCode;
+    this.sqlState = Objects.requireNonNull(sqlState);
+    this.severity = Objects.requireNonNull(severity);
+  }
+
+  /**
+   * Returns a human-readable error message.
+   */
+  public String getErrorMessage() {
+    return errorMessage;
+  }
+
+  /**
+   * Returns a numeric code for this error.
+   */
+  public int getErrorCode() {
+    return errorCode;
+  }
+
+  /**
+   * Returns the five-character identifier for this error.
+   */
+  public String getSqlState() {
+    return sqlState;
+  }
+
+  /**
+   * Returns the severity at which this exception is thrown.
+   */
+  public AvaticaSeverity getSeverity() {
+    return severity;
+  }
+
+  @Override public String toString() {
+    StringBuilder sb = new StringBuilder(64);
+    return sb.append("AvaticaRuntimeException: [")
+        .append("Messsage: '").append(errorMessage).append("', ")
+        .append("Error code: '").append(errorCode).append("', ")
+        .append("SQL State: '").append(sqlState).append("', ")
+        .append("Severity: '").append(severity).append("']").toString();
+  }
+}
+
+// End AvaticaRuntimeException.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java
new file mode 100644
index 0000000..e98c486
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.BuiltInConnectionProperty;
+import org.apache.calcite.avatica.ConnectionConfig;
+import org.apache.calcite.avatica.ConnectionProperty;
+import org.apache.calcite.avatica.DriverVersion;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.UnregisteredDriver;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Avatica Remote JDBC driver.
+ */
+public class Driver extends UnregisteredDriver {
+  private static final Logger LOG = LoggerFactory.getLogger(Driver.class);
+  public static final String CONNECT_STRING_PREFIX = "jdbc:avatica:remote:";
+
+  static {
+    new Driver().register();
+  }
+
+  public Driver() {
+    super();
+  }
+
+  /**
+   * Defines the method of message serialization used by the Driver
+   */
+  public static enum Serialization {
+    JSON,
+    PROTOBUF;
+  }
+
+  @Override protected String getConnectStringPrefix() {
+    return CONNECT_STRING_PREFIX;
+  }
+
+  protected DriverVersion createDriverVersion() {
+    return DriverVersion.load(
+        Driver.class,
+        "org-apache-calcite-jdbc.properties",
+        "Avatica Remote JDBC Driver",
+        "unknown version",
+        "Avatica",
+        "unknown version");
+  }
+
+  @Override protected Collection<ConnectionProperty> getConnectionProperties() {
+    final List<ConnectionProperty> list = new ArrayList<ConnectionProperty>();
+    Collections.addAll(list, BuiltInConnectionProperty.values());
+    Collections.addAll(list, AvaticaRemoteConnectionProperty.values());
+    return list;
+  }
+
+  @Override public Meta createMeta(AvaticaConnection connection) {
+    final ConnectionConfig config = connection.config();
+    final Service service = createService(connection, config);
+    return new RemoteMeta(connection, service);
+  }
+
+  /**
+   * Creates a {@link Service} with the given {@link AvaticaConnection} and configuration.
+   *
+   * @param connection The {@link AvaticaConnection} to use.
+   * @param config Configuration properties
+   * @return A Service implementation.
+   */
+  Service createService(AvaticaConnection connection, ConnectionConfig config) {
+    final Service.Factory metaFactory = config.factory();
+    final Service service;
+    if (metaFactory != null) {
+      service = metaFactory.create(connection);
+    } else if (config.url() != null) {
+      final AvaticaHttpClient httpClient = getHttpClient(connection, config);
+      final Serialization serializationType = getSerialization(config);
+
+      LOG.debug("Instantiating {} service", serializationType);
+      switch (serializationType) {
+      case JSON:
+        service = new RemoteService(httpClient);
+        break;
+      case PROTOBUF:
+        service = new RemoteProtobufService(httpClient, new ProtobufTranslationImpl());
+        break;
+      default:
+        throw new IllegalArgumentException("Unhandled serialization type: " + serializationType);
+      }
+    } else {
+      service = new MockJsonService(Collections.<String, String>emptyMap());
+    }
+    return service;
+  }
+
+  /**
+   * Creates the HTTP client that communicates with the Avatica server.
+   *
+   * @param connection The {@link AvaticaConnection}.
+   * @param config The configuration.
+   * @return An {@link AvaticaHttpClient} implementation.
+   */
+  AvaticaHttpClient getHttpClient(AvaticaConnection connection, ConnectionConfig config) {
+    URL url;
+    try {
+      url = new URL(config.url());
+    } catch (MalformedURLException e) {
+      throw new RuntimeException(e);
+    }
+
+    AvaticaHttpClientFactory httpClientFactory = config.httpClientFactory();
+
+    return httpClientFactory.getClient(url, config);
+  }
+
+  @Override public Connection connect(String url, Properties info)
+      throws SQLException {
+    AvaticaConnection conn = (AvaticaConnection) super.connect(url, info);
+    if (conn == null) {
+      // It's not an url for our driver
+      return null;
+    }
+
+    // Create the corresponding remote connection
+    ConnectionConfig config = conn.config();
+    Service service = createService(conn, config);
+
+    service.apply(
+        new Service.OpenConnectionRequest(conn.id,
+            Service.OpenConnectionRequest.serializeProperties(info)));
+
+    return conn;
+  }
+
+  Serialization getSerialization(ConnectionConfig config) {
+    final String serializationStr = config.serialization();
+    Serialization serializationType = Serialization.JSON;
+    if (null != serializationStr) {
+      try {
+        serializationType = Serialization.valueOf(serializationStr.toUpperCase());
+      } catch (Exception e) {
+        // Log a warning instead of failing harshly? Intentionally no loggers available?
+        throw new RuntimeException(e);
+      }
+    }
+
+    return serializationType;
+  }
+}
+
+// End Driver.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Handler.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Handler.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Handler.java
new file mode 100644
index 0000000..30d026c
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Handler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+
+import java.util.Objects;
+
+/**
+ * API for text request-response calls to an Avatica server.
+ *
+ * @param <T> The type this handler accepts and returns
+ */
+public interface Handler<T> {
+  int HTTP_OK = 200;
+  int HTTP_INTERNAL_SERVER_ERROR = 500;
+  String HANDLER_SERIALIZATION_METRICS_NAME = "Handler.Serialization";
+
+  /**
+   * Struct that encapsulates the context of the result of a request to Avatica.
+   */
+  public class HandlerResponse<T> {
+    private final T response;
+    private final int statusCode;
+
+    public HandlerResponse(T response, int statusCode) {
+      this.response = Objects.requireNonNull(response);
+      this.statusCode = statusCode;
+    }
+
+    public T getResponse() {
+      return response;
+    }
+
+    public int getStatusCode() {
+      return statusCode;
+    }
+
+    @Override public String toString() {
+      return "Response: " + response + ", Status:" + statusCode;
+    }
+  }
+
+  HandlerResponse<T> apply(T request);
+
+  /**
+   * Sets some general server information to return to the client in all responses.
+   *
+   * @param metadata Server-wide information
+   */
+  void setRpcMetadata(RpcMetadataResponse metadata);
+}
+
+// End Handler.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
new file mode 100644
index 0000000..fd57078
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+import org.apache.calcite.avatica.metrics.Timer;
+import org.apache.calcite.avatica.metrics.Timer.Context;
+import org.apache.calcite.avatica.remote.Service.Request;
+import org.apache.calcite.avatica.remote.Service.Response;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * Implementation of {@link org.apache.calcite.avatica.remote.Handler}
+ * that decodes JSON requests, sends them to a {@link Service},
+ * and encodes the responses into JSON.
+ *
+ * @see org.apache.calcite.avatica.remote.JsonService
+ */
+public class JsonHandler extends AbstractHandler<String> {
+
+  protected static final ObjectMapper MAPPER = JsonService.MAPPER;
+
+  final MetricsSystem metrics;
+  final Timer serializationTimer;
+
+  public JsonHandler(Service service, MetricsSystem metrics) {
+    super(service);
+    this.metrics = metrics;
+    this.serializationTimer = this.metrics.getTimer(
+        MetricsHelper.concat(JsonHandler.class, HANDLER_SERIALIZATION_METRICS_NAME));
+  }
+
+  public HandlerResponse<String> apply(String jsonRequest) {
+    return super.apply(jsonRequest);
+  }
+
+  @Override Request decode(String request) throws IOException {
+    try (final Context ctx = serializationTimer.start()) {
+      return MAPPER.readValue(request, Service.Request.class);
+    }
+  }
+
+  /**
+   * Serializes the provided object as JSON.
+   *
+   * @param response The object to serialize.
+   * @return A JSON string.
+   */
+  @Override String encode(Response response) throws IOException {
+    try (final Context ctx = serializationTimer.start()) {
+      final StringWriter w = new StringWriter();
+      MAPPER.writeValue(w, response);
+      return w.toString();
+    }
+  }
+}
+
+// End JsonHandler.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
new file mode 100644
index 0000000..668b3be
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * Implementation of {@link org.apache.calcite.avatica.remote.Service}
+ * that encodes requests and responses as JSON.
+ */
+public abstract class JsonService extends AbstractService {
+  public static final ObjectMapper MAPPER;
+  static {
+    MAPPER = new ObjectMapper();
+    MAPPER.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+    MAPPER.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
+    MAPPER.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true);
+  }
+
+  public JsonService() {
+  }
+
+  /** Derived class should implement this method to transport requests and
+   * responses to and from the peer service. */
+  public abstract String apply(String request);
+
+  @Override SerializationType getSerializationType() {
+    return SerializationType.JSON;
+  }
+
+  //@VisibleForTesting
+  protected static <T> T decode(String response, Class<T> expectedType)
+      throws IOException {
+    Response resp = MAPPER.readValue(response, Response.class);
+    if (resp instanceof ErrorResponse) {
+      throw ((ErrorResponse) resp).toException();
+    } else if (!expectedType.isAssignableFrom(resp.getClass())) {
+      throw new ClassCastException("Cannot cast " + resp.getClass() + " into " + expectedType);
+    }
+
+    return expectedType.cast(resp);
+  }
+
+  //@VisibleForTesting
+  protected static <T> String encode(T request) throws IOException {
+    final StringWriter w = new StringWriter();
+    MAPPER.writeValue(w, request);
+    return w.toString();
+  }
+
+  protected RuntimeException handle(IOException e) {
+    return new RuntimeException(e);
+  }
+
+  public ResultSetResponse apply(CatalogsRequest request) {
+    try {
+      return finagle(decode(apply(encode(request)), ResultSetResponse.class));
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public ResultSetResponse apply(SchemasRequest request) {
+    try {
+      return finagle(decode(apply(encode(request)), ResultSetResponse.class));
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public ResultSetResponse apply(TablesRequest request) {
+    try {
+      return finagle(decode(apply(encode(request)), ResultSetResponse.class));
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public ResultSetResponse apply(TableTypesRequest request) {
+    try {
+      return finagle(decode(apply(encode(request)), ResultSetResponse.class));
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public ResultSetResponse apply(TypeInfoRequest request) {
+    try {
+      return finagle(decode(apply(encode(request)), ResultSetResponse.class));
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public ResultSetResponse apply(ColumnsRequest request) {
+    try {
+      return finagle(decode(apply(encode(request)), ResultSetResponse.class));
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public PrepareResponse apply(PrepareRequest request) {
+    try {
+      return finagle(decode(apply(encode(request)), PrepareResponse.class));
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public ExecuteResponse apply(PrepareAndExecuteRequest request) {
+    try {
+      return finagle(decode(apply(encode(request)), ExecuteResponse.class));
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public FetchResponse apply(FetchRequest request) {
+    try {
+      return decode(apply(encode(request)), FetchResponse.class);
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public ExecuteResponse apply(ExecuteRequest request) {
+    try {
+      return finagle(decode(apply(encode(request)), ExecuteResponse.class));
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public CreateStatementResponse apply(CreateStatementRequest request) {
+    try {
+      return decode(apply(encode(request)), CreateStatementResponse.class);
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public CloseStatementResponse apply(CloseStatementRequest request) {
+    try {
+      return decode(apply(encode(request)), CloseStatementResponse.class);
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public OpenConnectionResponse apply(OpenConnectionRequest request) {
+    try {
+      return decode(apply(encode(request)), OpenConnectionResponse.class);
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public CloseConnectionResponse apply(CloseConnectionRequest request) {
+    try {
+      return decode(apply(encode(request)), CloseConnectionResponse.class);
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public ConnectionSyncResponse apply(ConnectionSyncRequest request) {
+    try {
+      return decode(apply(encode(request)), ConnectionSyncResponse.class);
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public DatabasePropertyResponse apply(DatabasePropertyRequest request) {
+    try {
+      return decode(apply(encode(request)), DatabasePropertyResponse.class);
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public SyncResultsResponse apply(SyncResultsRequest request) {
+    try {
+      return decode(apply(encode(request)), SyncResultsResponse.class);
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public CommitResponse apply(CommitRequest request) {
+    try {
+      return decode(apply(encode(request)), CommitResponse.class);
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+
+  public RollbackResponse apply(RollbackRequest request) {
+    try {
+      return decode(apply(encode(request)), RollbackResponse.class);
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+}
+
+// End JsonService.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java
new file mode 100644
index 0000000..0af6300
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * Implementation of {@link org.apache.calcite.avatica.remote.Service}
+ * that goes to an in-process instance of {@code Service}.
+ */
+public class LocalJsonService extends JsonService {
+  private final Service service;
+
+  public LocalJsonService(Service service) {
+    this.service = service;
+  }
+
+  @Override public String apply(String request) {
+    try {
+      Request request2 = MAPPER.readValue(request, Request.class);
+      Response response2 = request2.accept(service);
+      final StringWriter w = new StringWriter();
+      MAPPER.writeValue(w, response2);
+      return w.toString();
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
+}
+
+// End LocalJsonService.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java
new file mode 100644
index 0000000..76e2392
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import java.io.IOException;
+
+/**
+ * A Service implementation that performs protocol buffer serialization on request and responses
+ * on either side of computing a response from a request to mimic some transport to a server which
+ * would normally perform such computation.
+ */
+public class LocalProtobufService extends ProtobufService {
+  private final Service service;
+  private final ProtobufTranslation translation;
+
+  public LocalProtobufService(Service service, ProtobufTranslation translation) {
+    this.service = service;
+    this.translation = translation;
+  }
+
+  @Override public Response _apply(Request request) {
+    try {
+      // Serialize the request to "send to the server"
+      byte[] serializedRequest = translation.serializeRequest(request);
+
+      // *some transport would normally happen here*
+
+      // Fake deserializing that request somewhere else
+      Request request2 = translation.parseRequest(serializedRequest);
+
+      // Serialize the response from the service to "send to the client"
+      byte[] serializedResponse = translation.serializeResponse(request2.accept(service));
+
+      // *some transport would normally happen here*
+
+      // Deserialize the response on "the client"
+      return translation.parseResponse(serializedResponse);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
+
+// End LocalProtobufService.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
new file mode 100644
index 0000000..c070ec0
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.Meta;
+
+import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.MissingResultsException;
+import org.apache.calcite.avatica.NoSuchStatementException;
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+import org.apache.calcite.avatica.metrics.Timer;
+import org.apache.calcite.avatica.metrics.Timer.Context;
+import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.calcite.avatica.remote.MetricsHelper.concat;
+
+/**
+ * Implementation of {@link Service} that talks to a local {@link Meta}.
+ */
+public class LocalService implements Service {
+  final Meta meta;
+  final MetricsSystem metrics;
+
+  private final Timer executeTimer;
+  private final Timer commitTimer;
+  private final Timer prepareTimer;
+  private final Timer prepareAndExecuteTimer;
+  private final Timer connectionSyncTimer;
+
+  private RpcMetadataResponse serverLevelRpcMetadata;
+
+  public LocalService(Meta meta) {
+    this(meta, NoopMetricsSystem.getInstance());
+  }
+
+  public LocalService(Meta meta, MetricsSystem metrics) {
+    this.meta = meta;
+    this.metrics = Objects.requireNonNull(metrics);
+
+    this.executeTimer = this.metrics.getTimer(name("Execute"));
+    this.commitTimer = this.metrics.getTimer(name("Commit"));
+    this.prepareTimer = this.metrics.getTimer(name("Prepare"));
+    this.prepareAndExecuteTimer = this.metrics.getTimer(name("PrepareAndExecute"));
+    this.connectionSyncTimer = this.metrics.getTimer(name("ConnectionSync"));
+  }
+
+  private static String name(String timer) {
+    return concat(LocalService.class, timer);
+  }
+
+  @Override public void setRpcMetadata(RpcMetadataResponse serverLevelRpcMetadata) {
+    this.serverLevelRpcMetadata = Objects.requireNonNull(serverLevelRpcMetadata);
+  }
+
+  private static <E> List<E> list(Iterable<E> iterable) {
+    if (iterable instanceof List) {
+      return (List<E>) iterable;
+    }
+    final List<E> rowList = new ArrayList<>();
+    for (E row : iterable) {
+      rowList.add(row);
+    }
+    return rowList;
+  }
+
+  /** Converts a result set (not serializable) into a serializable response. */
+  public ResultSetResponse toResponse(Meta.MetaResultSet resultSet) {
+    if (resultSet.updateCount != -1) {
+      return new ResultSetResponse(resultSet.connectionId,
+          resultSet.statementId, resultSet.ownStatement, null, null,
+          resultSet.updateCount, serverLevelRpcMetadata);
+    }
+
+    Meta.Signature signature = resultSet.signature;
+    Meta.CursorFactory cursorFactory = resultSet.signature.cursorFactory;
+    Meta.Frame frame = null;
+    int updateCount = -1;
+    final List<Object> list;
+
+    if (resultSet.firstFrame != null) {
+      list = list(resultSet.firstFrame.rows);
+      switch (cursorFactory.style) {
+      case ARRAY:
+        cursorFactory = Meta.CursorFactory.LIST;
+        break;
+      case MAP:
+      case LIST:
+        break;
+      case RECORD:
+        cursorFactory = Meta.CursorFactory.LIST;
+        break;
+      default:
+        cursorFactory = Meta.CursorFactory.map(cursorFactory.fieldNames);
+      }
+
+      final boolean done = resultSet.firstFrame.done;
+
+      frame = new Meta.Frame(0, done, list);
+      updateCount = -1;
+
+      if (signature.statementType != null) {
+        if (signature.statementType.canUpdate()) {
+          frame = null;
+          updateCount = ((Number) ((List) list.get(0)).get(0)).intValue();
+        }
+      }
+    } else {
+      //noinspection unchecked
+      list = (List<Object>) (List) list2(resultSet);
+      cursorFactory = Meta.CursorFactory.LIST;
+    }
+
+    if (cursorFactory != resultSet.signature.cursorFactory) {
+      signature = signature.setCursorFactory(cursorFactory);
+    }
+
+    return new ResultSetResponse(resultSet.connectionId, resultSet.statementId,
+        resultSet.ownStatement, signature, frame, updateCount, serverLevelRpcMetadata);
+  }
+
+  private List<List<Object>> list2(Meta.MetaResultSet resultSet) {
+    final Meta.StatementHandle h = new Meta.StatementHandle(
+        resultSet.connectionId, resultSet.statementId, null);
+    final List<TypedValue> parameterValues = Collections.emptyList();
+    final Iterable<Object> iterable = meta.createIterable(h, null,
+        resultSet.signature, parameterValues, resultSet.firstFrame);
+    final List<List<Object>> list = new ArrayList<>();
+    return MetaImpl.collect(resultSet.signature.cursorFactory, iterable, list);
+  }
+
+  public ResultSetResponse apply(CatalogsRequest request) {
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
+    final Meta.MetaResultSet resultSet = meta.getCatalogs(ch);
+    return toResponse(resultSet);
+  }
+
+  public ResultSetResponse apply(SchemasRequest request) {
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
+    final Meta.MetaResultSet resultSet =
+        meta.getSchemas(ch, request.catalog, Meta.Pat.of(request.schemaPattern));
+    return toResponse(resultSet);
+  }
+
+  public ResultSetResponse apply(TablesRequest request) {
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
+    final Meta.MetaResultSet resultSet =
+        meta.getTables(ch,
+            request.catalog,
+            Meta.Pat.of(request.schemaPattern),
+            Meta.Pat.of(request.tableNamePattern),
+            request.typeList);
+    return toResponse(resultSet);
+  }
+
+  public ResultSetResponse apply(TableTypesRequest request) {
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
+    final Meta.MetaResultSet resultSet = meta.getTableTypes(ch);
+    return toResponse(resultSet);
+  }
+
+  public ResultSetResponse apply(TypeInfoRequest request) {
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
+    final Meta.MetaResultSet resultSet = meta.getTypeInfo(ch);
+    return toResponse(resultSet);
+  }
+
+  public ResultSetResponse apply(ColumnsRequest request) {
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
+    final Meta.MetaResultSet resultSet =
+        meta.getColumns(ch,
+            request.catalog,
+            Meta.Pat.of(request.schemaPattern),
+            Meta.Pat.of(request.tableNamePattern),
+            Meta.Pat.of(request.columnNamePattern));
+    return toResponse(resultSet);
+  }
+
+  public PrepareResponse apply(PrepareRequest request) {
+    try (final Context ctx = prepareTimer.start()) {
+      final Meta.ConnectionHandle ch =
+          new Meta.ConnectionHandle(request.connectionId);
+      final Meta.StatementHandle h =
+          meta.prepare(ch, request.sql, request.maxRowCount);
+      return new PrepareResponse(h, serverLevelRpcMetadata);
+    }
+  }
+
+  public ExecuteResponse apply(PrepareAndExecuteRequest request) {
+    try (final Context ctx = prepareAndExecuteTimer.start()) {
+      final Meta.StatementHandle sh =
+          new Meta.StatementHandle(request.connectionId, request.statementId, null);
+      try {
+        final Meta.ExecuteResult executeResult =
+            meta.prepareAndExecute(sh, request.sql, request.maxRowCount,
+                new Meta.PrepareCallback() {
+                  @Override public Object getMonitor() {
+                    return LocalService.class;
+                  }
+
+                  @Override public void clear() {
+                  }
+
+                  @Override public void assign(Meta.Signature signature,
+                      Meta.Frame firstFrame, long updateCount) {
+                  }
+
+                  @Override public void execute() {
+                  }
+                });
+        final List<ResultSetResponse> results = new ArrayList<>();
+        for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) {
+          results.add(toResponse(metaResultSet));
+        }
+        return new ExecuteResponse(results, false, serverLevelRpcMetadata);
+      } catch (NoSuchStatementException e) {
+        // The Statement doesn't exist anymore, bubble up this information
+        return new ExecuteResponse(null, true, serverLevelRpcMetadata);
+      }
+    }
+  }
+
+  public FetchResponse apply(FetchRequest request) {
+    final Meta.StatementHandle h = new Meta.StatementHandle(
+        request.connectionId, request.statementId, null);
+    try {
+      final Meta.Frame frame =
+          meta.fetch(h,
+              request.offset,
+              request.fetchMaxRowCount);
+      return new FetchResponse(frame, false, false, serverLevelRpcMetadata);
+    } catch (NullPointerException | NoSuchStatementException e) {
+      // The Statement doesn't exist anymore, bubble up this information
+      return new FetchResponse(null, true, true, serverLevelRpcMetadata);
+    } catch (MissingResultsException e) {
+      return new FetchResponse(null, false, true, serverLevelRpcMetadata);
+    }
+  }
+
+  public ExecuteResponse apply(ExecuteRequest request) {
+    try (final Context ctx = executeTimer.start()) {
+      try {
+        final Meta.ExecuteResult executeResult = meta.execute(request.statementHandle,
+            request.parameterValues, request.maxRowCount);
+
+        final List<ResultSetResponse> results = new ArrayList<>(executeResult.resultSets.size());
+        for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) {
+          results.add(toResponse(metaResultSet));
+        }
+        return new ExecuteResponse(results, false, serverLevelRpcMetadata);
+      } catch (NoSuchStatementException e) {
+        return new ExecuteResponse(null, true, serverLevelRpcMetadata);
+      }
+    }
+  }
+
+  public CreateStatementResponse apply(CreateStatementRequest request) {
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
+    final Meta.StatementHandle h = meta.createStatement(ch);
+    return new CreateStatementResponse(h.connectionId, h.id, serverLevelRpcMetadata);
+  }
+
+  public CloseStatementResponse apply(CloseStatementRequest request) {
+    final Meta.StatementHandle h = new Meta.StatementHandle(
+        request.connectionId, request.statementId, null);
+    meta.closeStatement(h);
+    return new CloseStatementResponse(serverLevelRpcMetadata);
+  }
+
+  public OpenConnectionResponse apply(OpenConnectionRequest request) {
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
+    meta.openConnection(ch, request.info);
+    return new OpenConnectionResponse(serverLevelRpcMetadata);
+  }
+
+  public CloseConnectionResponse apply(CloseConnectionRequest request) {
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
+    meta.closeConnection(ch);
+    return new CloseConnectionResponse(serverLevelRpcMetadata);
+  }
+
+  public ConnectionSyncResponse apply(ConnectionSyncRequest request) {
+    try (final Context ctx = connectionSyncTimer.start()) {
+      final Meta.ConnectionHandle ch =
+          new Meta.ConnectionHandle(request.connectionId);
+      final Meta.ConnectionProperties connProps =
+          meta.connectionSync(ch, request.connProps);
+      return new ConnectionSyncResponse(connProps, serverLevelRpcMetadata);
+    }
+  }
+
+  public DatabasePropertyResponse apply(DatabasePropertyRequest request) {
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
+    return new DatabasePropertyResponse(meta.getDatabaseProperties(ch), serverLevelRpcMetadata);
+  }
+
+  public SyncResultsResponse apply(SyncResultsRequest request) {
+    final Meta.StatementHandle h = new Meta.StatementHandle(
+        request.connectionId, request.statementId, null);
+    SyncResultsResponse response;
+    try {
+      // Set success on the cached statement
+      response = new SyncResultsResponse(meta.syncResults(h, request.state, request.offset), false,
+          serverLevelRpcMetadata);
+    } catch (NoSuchStatementException e) {
+      // Tried to sync results on a statement which wasn't cached
+      response = new SyncResultsResponse(false, true, serverLevelRpcMetadata);
+    }
+
+    return response;
+  }
+
+  public CommitResponse apply(CommitRequest request) {
+    try (final Context ctx = commitTimer.start()) {
+      meta.commit(new Meta.ConnectionHandle(request.connectionId));
+
+      // If commit() errors, let the ErrorResponse be sent back via an uncaught Exception.
+      return new CommitResponse();
+    }
+  }
+
+  public RollbackResponse apply(RollbackRequest request) {
+    meta.rollback(new Meta.ConnectionHandle(request.connectionId));
+
+    // If rollback() errors, let the ErrorResponse be sent back via an uncaught Exception.
+    return new RollbackResponse();
+  }
+}
+
+// End LocalService.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java
new file mode 100644
index 0000000..12a5b59
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.proto.Common;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+
+/**
+ * Identifies an operation from {@link DatabaseMetaData} which returns a {@link ResultSet}. This
+ * enum is used to allow clients to request the server to re-instantiate a {@link ResultSet} for
+ * these operations which do not have a SQL string associated with them as a normal query does.
+ */
+public enum MetaDataOperation {
+  GET_ATTRIBUTES,
+  GET_BEST_ROW_IDENTIFIER,
+  GET_CATALOGS,
+  GET_CLIENT_INFO_PROPERTIES,
+  GET_COLUMN_PRIVILEGES,
+  GET_COLUMNS,
+  GET_CROSS_REFERENCE,
+  GET_EXPORTED_KEYS,
+  GET_FUNCTION_COLUMNS,
+  GET_FUNCTIONS,
+  GET_IMPORTED_KEYS,
+  GET_INDEX_INFO,
+  GET_PRIMARY_KEYS,
+  GET_PROCEDURE_COLUMNS,
+  GET_PROCEDURES,
+  GET_PSEUDO_COLUMNS,
+  GET_SCHEMAS,
+  GET_SCHEMAS_WITH_ARGS,
+  GET_SUPER_TABLES,
+  GET_SUPER_TYPES,
+  GET_TABLE_PRIVILEGES,
+  GET_TABLES,
+  GET_TABLE_TYPES,
+  GET_TYPE_INFO,
+  GET_UDTS,
+  GET_VERSION_COLUMNS;
+
+  public Common.MetaDataOperation toProto() {
+    switch (this) {
+    case GET_ATTRIBUTES:
+      return Common.MetaDataOperation.GET_ATTRIBUTES;
+    case GET_BEST_ROW_IDENTIFIER:
+      return Common.MetaDataOperation.GET_BEST_ROW_IDENTIFIER;
+    case GET_CATALOGS:
+      return Common.MetaDataOperation.GET_CATALOGS;
+    case GET_CLIENT_INFO_PROPERTIES:
+      return Common.MetaDataOperation.GET_CLIENT_INFO_PROPERTIES;
+    case GET_COLUMNS:
+      return Common.MetaDataOperation.GET_COLUMNS;
+    case GET_COLUMN_PRIVILEGES:
+      return Common.MetaDataOperation.GET_COLUMN_PRIVILEGES;
+    case GET_CROSS_REFERENCE:
+      return Common.MetaDataOperation.GET_CROSS_REFERENCE;
+    case GET_EXPORTED_KEYS:
+      return Common.MetaDataOperation.GET_EXPORTED_KEYS;
+    case GET_FUNCTIONS:
+      return Common.MetaDataOperation.GET_FUNCTIONS;
+    case GET_FUNCTION_COLUMNS:
+      return Common.MetaDataOperation.GET_FUNCTION_COLUMNS;
+    case GET_IMPORTED_KEYS:
+      return Common.MetaDataOperation.GET_IMPORTED_KEYS;
+    case GET_INDEX_INFO:
+      return Common.MetaDataOperation.GET_INDEX_INFO;
+    case GET_PRIMARY_KEYS:
+      return Common.MetaDataOperation.GET_PRIMARY_KEYS;
+    case GET_PROCEDURES:
+      return Common.MetaDataOperation.GET_PROCEDURES;
+    case GET_PROCEDURE_COLUMNS:
+      return Common.MetaDataOperation.GET_PROCEDURE_COLUMNS;
+    case GET_PSEUDO_COLUMNS:
+      return Common.MetaDataOperation.GET_PSEUDO_COLUMNS;
+    case GET_SCHEMAS:
+      return Common.MetaDataOperation.GET_SCHEMAS;
+    case GET_SCHEMAS_WITH_ARGS:
+      return Common.MetaDataOperation.GET_SCHEMAS_WITH_ARGS;
+    case GET_SUPER_TABLES:
+      return Common.MetaDataOperation.GET_SUPER_TABLES;
+    case GET_SUPER_TYPES:
+      return Common.MetaDataOperation.GET_SUPER_TYPES;
+    case GET_TABLES:
+      return Common.MetaDataOperation.GET_TABLES;
+    case GET_TABLE_PRIVILEGES:
+      return Common.MetaDataOperation.GET_TABLE_PRIVILEGES;
+    case GET_TABLE_TYPES:
+      return Common.MetaDataOperation.GET_TABLE_TYPES;
+    case GET_TYPE_INFO:
+      return Common.MetaDataOperation.GET_TYPE_INFO;
+    case GET_UDTS:
+      return Common.MetaDataOperation.GET_UDTS;
+    case GET_VERSION_COLUMNS:
+      return Common.MetaDataOperation.GET_VERSION_COLUMNS;
+    default:
+      throw new RuntimeException("Unknown type: " + this);
+    }
+  }
+
+  public static MetaDataOperation fromProto(Common.MetaDataOperation protoOp) {
+    // Null is acceptable
+    if (null == protoOp) {
+      return null;
+    }
+
+    switch (protoOp) {
+    case GET_ATTRIBUTES:
+      return MetaDataOperation.GET_ATTRIBUTES;
+    case GET_BEST_ROW_IDENTIFIER:
+      return MetaDataOperation.GET_BEST_ROW_IDENTIFIER;
+    case GET_CATALOGS:
+      return MetaDataOperation.GET_CATALOGS;
+    case GET_CLIENT_INFO_PROPERTIES:
+      return MetaDataOperation.GET_CLIENT_INFO_PROPERTIES;
+    case GET_COLUMNS:
+      return MetaDataOperation.GET_COLUMNS;
+    case GET_COLUMN_PRIVILEGES:
+      return MetaDataOperation.GET_COLUMN_PRIVILEGES;
+    case GET_CROSS_REFERENCE:
+      return MetaDataOperation.GET_CROSS_REFERENCE;
+    case GET_EXPORTED_KEYS:
+      return MetaDataOperation.GET_EXPORTED_KEYS;
+    case GET_FUNCTIONS:
+      return MetaDataOperation.GET_FUNCTIONS;
+    case GET_FUNCTION_COLUMNS:
+      return MetaDataOperation.GET_FUNCTION_COLUMNS;
+    case GET_IMPORTED_KEYS:
+      return MetaDataOperation.GET_IMPORTED_KEYS;
+    case GET_INDEX_INFO:
+      return MetaDataOperation.GET_INDEX_INFO;
+    case GET_PRIMARY_KEYS:
+      return MetaDataOperation.GET_PRIMARY_KEYS;
+    case GET_PROCEDURES:
+      return MetaDataOperation.GET_PROCEDURES;
+    case GET_PROCEDURE_COLUMNS:
+      return MetaDataOperation.GET_PROCEDURE_COLUMNS;
+    case GET_PSEUDO_COLUMNS:
+      return MetaDataOperation.GET_PSEUDO_COLUMNS;
+    case GET_SCHEMAS:
+      return MetaDataOperation.GET_SCHEMAS;
+    case GET_SCHEMAS_WITH_ARGS:
+      return MetaDataOperation.GET_SCHEMAS_WITH_ARGS;
+    case GET_SUPER_TABLES:
+      return MetaDataOperation.GET_SUPER_TABLES;
+    case GET_SUPER_TYPES:
+      return MetaDataOperation.GET_SUPER_TYPES;
+    case GET_TABLES:
+      return MetaDataOperation.GET_TABLES;
+    case GET_TABLE_PRIVILEGES:
+      return MetaDataOperation.GET_TABLE_PRIVILEGES;
+    case GET_TABLE_TYPES:
+      return MetaDataOperation.GET_TABLE_TYPES;
+    case GET_TYPE_INFO:
+      return MetaDataOperation.GET_TYPE_INFO;
+    case GET_UDTS:
+      return MetaDataOperation.GET_UDTS;
+    case GET_VERSION_COLUMNS:
+      return MetaDataOperation.GET_VERSION_COLUMNS;
+    default:
+      throw new RuntimeException("Unknown type: " + protoOp);
+    }
+  }
+}
+
+// End MetaDataOperation.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java
new file mode 100644
index 0000000..2561b29
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+/**
+ * A utility class to encapsulate common logic in use of metrics implementation.
+ */
+public class MetricsHelper {
+
+  private static final String PERIOD = ".";
+
+  private MetricsHelper() {}
+
+  public static String concat(Class<?> clz, String name) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(clz.getName());
+    return sb.append(PERIOD).append(name).toString();
+  }
+
+}
+
+// End MetricsHelper.java