You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by el...@apache.org on 2016/03/07 19:28:15 UTC
[37/59] [partial] calcite git commit: [CALCITE-1078] Detach avatica
from the core calcite Maven project
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractHandler.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractHandler.java
new file mode 100644
index 0000000..1968fcd
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractHandler.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaSeverity;
+import org.apache.calcite.avatica.NoSuchConnectionException;
+import org.apache.calcite.avatica.remote.Service.ErrorResponse;
+import org.apache.calcite.avatica.remote.Service.Request;
+import org.apache.calcite.avatica.remote.Service.Response;
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+
+import java.io.IOException;
+
+/**
+ * Abstract base class for {@link Handler}s to extend to inherit functionality common across
+ * serialization strategies.
+ *
+ * @param <T> The format Requests/Responses are serialized as.
+ */
+public abstract class AbstractHandler<T> implements Handler<T> {
+ private static final String NULL_EXCEPTION_MESSAGE = "(null exception message)";
+ protected final Service service;
+ private RpcMetadataResponse metadata = null;
+
+ public AbstractHandler(Service service) {
+ this.service = service;
+ }
+
+ abstract Request decode(T serializedRequest) throws IOException;
+
+ /**
+ * Serialize the given {@link Response} per the concrete {@link Handler} implementation.
+ *
+ * @param response The {@link Response} to serialize.
+ * @return A serialized representation of the {@link Response}.
+ * @throws IOException
+ */
+ abstract T encode(Response response) throws IOException;
+
+ /**
+ * Unwrap Avatica-specific context about a given exception.
+ *
+ * @param e A caught exception throw by Avatica implementation.
+ * @return An {@link ErrorResponse}.
+ */
+ ErrorResponse unwrapException(Exception e) {
+ // By default, we know nothing extra.
+ int errorCode = ErrorResponse.UNKNOWN_ERROR_CODE;
+ String sqlState = ErrorResponse.UNKNOWN_SQL_STATE;
+ AvaticaSeverity severity = AvaticaSeverity.UNKNOWN;
+ String errorMsg = null;
+
+ // Extract the contextual information if we have it. We may not.
+ if (e instanceof AvaticaRuntimeException) {
+ AvaticaRuntimeException rte = (AvaticaRuntimeException) e;
+ errorCode = rte.getErrorCode();
+ sqlState = rte.getSqlState();
+ severity = rte.getSeverity();
+ errorMsg = rte.getErrorMessage();
+ } else if (e instanceof NoSuchConnectionException) {
+ errorCode = ErrorResponse.MISSING_CONNECTION_ERROR_CODE;
+ severity = AvaticaSeverity.ERROR;
+ errorMsg = e.getMessage();
+ } else {
+ // Try to construct a meaningful error message when the server impl doesn't provide one.
+ errorMsg = getCausalChain(e);
+ }
+
+ return new ErrorResponse(e, errorMsg, errorCode, sqlState, severity, metadata);
+ }
+
+ /**
+ * Compute a response for the given request, handling errors generated by that computation.
+ *
+ * @param serializedRequest The caller's request.
+ * @return A {@link Response} with additional context about that response.
+ */
+ public HandlerResponse<T> apply(T serializedRequest) {
+ final Service.Request request;
+ try {
+ request = decode(serializedRequest);
+ } catch (IOException e) {
+ // TODO provide a canned ErrorResponse.
+ throw new RuntimeException(e);
+ }
+
+ try {
+ final Service.Response response = request.accept(service);
+ return new HandlerResponse<>(encode(response), HTTP_OK);
+ } catch (Exception e) {
+ ErrorResponse errorResp = unwrapException(e);
+
+ try {
+ return new HandlerResponse<>(encode(errorResp), HTTP_INTERNAL_SERVER_ERROR);
+ } catch (IOException e1) {
+ // TODO provide a canned ErrorResponse
+
+ // If we can't serialize error message to JSON, can't give a meaningful error to caller.
+ // Just try to not unnecessarily create more exceptions.
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
+
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Constructs a message for the summary of an Exception.
+ *
+ * @param e The Exception to summarize.
+ * @return A summary message for the Exception.
+ */
+ private String getCausalChain(Exception e) {
+ StringBuilder sb = new StringBuilder(16);
+ Throwable curr = e;
+ // Could use Guava, but that would increase dependency set unnecessarily.
+ while (null != curr) {
+ if (sb.length() > 0) {
+ sb.append(" -> ");
+ }
+ String message = curr.getMessage();
+ sb.append(curr.getClass().getSimpleName()).append(": ");
+ sb.append(null == message ? NULL_EXCEPTION_MESSAGE : message);
+ curr = curr.getCause();
+ }
+ if (sb.length() == 0) {
+ // Catch the case where we have no error message.
+ return "Unknown error message";
+ }
+ return sb.toString();
+ }
+
+ @Override public void setRpcMetadata(RpcMetadataResponse metadata) {
+ this.metadata = metadata;
+ }
+}
+
+// End AbstractHandler.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java
new file mode 100644
index 0000000..ffaa360
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.Meta;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A common base class for {@link Service} implementations that implement
+ * modifications made to response objects.
+ */
+public abstract class AbstractService implements Service {
+
+ private RpcMetadataResponse rpcMetadata = null;
+
+ /**
+ * Represents the serialization of the data over a transport.
+ */
+ enum SerializationType {
+ JSON,
+ PROTOBUF
+ }
+
+ /**
+ * @return The manner in which the data is serialized.
+ */
+ abstract SerializationType getSerializationType();
+
+ /** Modifies a signature, changing the representation of numeric columns
+ * within it. This deals with the fact that JSON transmits a small long value,
+ * or a float which is a whole number, as an integer. Thus the accessors need
+ * be prepared to accept any numeric type. */
+ Meta.Signature finagle(Meta.Signature signature) {
+ final List<ColumnMetaData> columns = new ArrayList<>();
+ for (ColumnMetaData column : signature.columns) {
+ columns.add(finagle(column));
+ }
+ if (columns.equals(signature.columns)) {
+ return signature;
+ }
+ return new Meta.Signature(columns, signature.sql,
+ signature.parameters, signature.internalParameters,
+ signature.cursorFactory, signature.statementType);
+ }
+
+ ColumnMetaData finagle(ColumnMetaData column) {
+ switch (column.type.rep) {
+ case BYTE:
+ case PRIMITIVE_BYTE:
+ case DOUBLE:
+ case PRIMITIVE_DOUBLE:
+ case FLOAT:
+ case PRIMITIVE_FLOAT:
+ case INTEGER:
+ case PRIMITIVE_INT:
+ case SHORT:
+ case PRIMITIVE_SHORT:
+ case LONG:
+ case PRIMITIVE_LONG:
+ return column.setRep(ColumnMetaData.Rep.NUMBER);
+ default:
+ // continue
+ break;
+ }
+ switch (column.type.id) {
+ case Types.VARBINARY:
+ case Types.BINARY:
+ switch (getSerializationType()) {
+ case JSON:
+ return column.setRep(ColumnMetaData.Rep.STRING);
+ case PROTOBUF:
+ return column;
+ default:
+ throw new IllegalStateException("Unhadled case statement");
+ }
+ case Types.DECIMAL:
+ case Types.NUMERIC:
+ return column.setRep(ColumnMetaData.Rep.NUMBER);
+ default:
+ return column;
+ }
+ }
+
+ PrepareResponse finagle(PrepareResponse response) {
+ final Meta.StatementHandle statement = finagle(response.statement);
+ if (statement == response.statement) {
+ return response;
+ }
+ return new PrepareResponse(statement, rpcMetadata);
+ }
+
+ Meta.StatementHandle finagle(Meta.StatementHandle h) {
+ final Meta.Signature signature = finagle(h.signature);
+ if (signature == h.signature) {
+ return h;
+ }
+ return new Meta.StatementHandle(h.connectionId, h.id, signature);
+ }
+
+ ResultSetResponse finagle(ResultSetResponse r) {
+ if (r.updateCount != -1) {
+ assert r.signature == null;
+ return r;
+ }
+ if (r.signature == null) {
+ return r;
+ }
+ final Meta.Signature signature = finagle(r.signature);
+ if (signature == r.signature) {
+ return r;
+ }
+ return new ResultSetResponse(r.connectionId, r.statementId, r.ownStatement,
+ signature, r.firstFrame, r.updateCount, rpcMetadata);
+ }
+
+ ExecuteResponse finagle(ExecuteResponse r) {
+ if (r.missingStatement) {
+ return r;
+ }
+ final List<ResultSetResponse> results = new ArrayList<>();
+ int changeCount = 0;
+ for (ResultSetResponse result : r.results) {
+ ResultSetResponse result2 = finagle(result);
+ if (result2 != result) {
+ ++changeCount;
+ }
+ results.add(result2);
+ }
+ if (changeCount == 0) {
+ return r;
+ }
+ return new ExecuteResponse(results, r.missingStatement, rpcMetadata);
+ }
+
+ @Override public void setRpcMetadata(RpcMetadataResponse metadata) {
+ // OK if this is null
+ this.rpcMetadata = metadata;
+ }
+}
+
+// End AbstractService.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
new file mode 100644
index 0000000..3bda248
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaCommonsHttpClientImpl.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.http.ConnectionReuseStrategy;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.protocol.RequestExpectContinue;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.pool.BasicConnFactory;
+import org.apache.http.impl.pool.BasicConnPool;
+import org.apache.http.impl.pool.BasicPoolEntry;
+import org.apache.http.message.BasicHttpEntityEnclosingRequest;
+import org.apache.http.protocol.HttpCoreContext;
+import org.apache.http.protocol.HttpProcessor;
+import org.apache.http.protocol.HttpProcessorBuilder;
+import org.apache.http.protocol.HttpRequestExecutor;
+import org.apache.http.protocol.RequestConnControl;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestTargetHost;
+import org.apache.http.util.EntityUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.concurrent.Future;
+
+/**
+ * A common class to invoke HTTP requests against the Avatica server agnostic of the data being
+ * sent and received across the wire.
+ */
+public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient {
+ private static final Logger LOG = LoggerFactory.getLogger(AvaticaCommonsHttpClientImpl.class);
+ private static final ConnectionReuseStrategy REUSE = DefaultConnectionReuseStrategy.INSTANCE;
+
+ // Some basic exposed configurations
+ private static final String MAX_POOLED_CONNECTION_PER_ROUTE_KEY =
+ "avatica.pooled.connections.per.route";
+ private static final String MAX_POOLED_CONNECTION_PER_ROUTE_DEFAULT = "4";
+ private static final String MAX_POOLED_CONNECTIONS_KEY = "avatica.pooled.connections.max";
+ private static final String MAX_POOLED_CONNECTIONS_DEFAULT = "16";
+
+ protected final HttpHost host;
+ protected final HttpProcessor httpProcessor;
+ protected final HttpRequestExecutor httpExecutor;
+ protected final BasicConnPool httpPool;
+
+ public AvaticaCommonsHttpClientImpl(URL url) {
+ this.host = new HttpHost(url.getHost(), url.getPort(), url.getProtocol());
+
+ this.httpProcessor = HttpProcessorBuilder.create()
+ .add(new RequestContent())
+ .add(new RequestTargetHost())
+ .add(new RequestConnControl())
+ .add(new RequestExpectContinue()).build();
+
+ this.httpExecutor = new HttpRequestExecutor();
+
+ this.httpPool = new BasicConnPool(new BasicConnFactory());
+ int maxPerRoute = Integer.parseInt(
+ System.getProperty(MAX_POOLED_CONNECTION_PER_ROUTE_KEY,
+ MAX_POOLED_CONNECTION_PER_ROUTE_DEFAULT));
+ int maxTotal = Integer.parseInt(
+ System.getProperty(MAX_POOLED_CONNECTIONS_KEY,
+ MAX_POOLED_CONNECTIONS_DEFAULT));
+ httpPool.setDefaultMaxPerRoute(maxPerRoute);
+ httpPool.setMaxTotal(maxTotal);
+ }
+
+ public byte[] send(byte[] request) {
+ while (true) {
+ boolean reusable = false;
+ // Get a connection from the pool
+ Future<BasicPoolEntry> future = this.httpPool.lease(host, null);
+ BasicPoolEntry entry = null;
+ try {
+ entry = future.get();
+ HttpCoreContext coreContext = HttpCoreContext.create();
+ coreContext.setTargetHost(host);
+
+ HttpClientConnection conn = entry.getConnection();
+
+ ByteArrayEntity entity = new ByteArrayEntity(request, ContentType.APPLICATION_OCTET_STREAM);
+
+ BasicHttpEntityEnclosingRequest postRequest =
+ new BasicHttpEntityEnclosingRequest("POST", "/");
+ postRequest.setEntity(entity);
+
+ httpExecutor.preProcess(postRequest, httpProcessor, coreContext);
+ HttpResponse response = httpExecutor.execute(postRequest, conn, coreContext);
+ httpExecutor.postProcess(response, httpProcessor, coreContext);
+
+ // Should the connection be kept alive?
+ reusable = REUSE.keepAlive(response, coreContext);
+
+ final int statusCode = response.getStatusLine().getStatusCode();
+ if (HttpURLConnection.HTTP_UNAVAILABLE == statusCode) {
+ // Could be sitting behind a load-balancer, try again.
+ continue;
+ }
+
+ return EntityUtils.toByteArray(response.getEntity());
+ } catch (Exception e) {
+ LOG.debug("Failed to execute HTTP request", e);
+ throw new RuntimeException(e);
+ } finally {
+ // Release the connection back to the pool, marking if it's good to reuse or not.
+ httpPool.release(entry, reusable);
+ }
+ }
+ }
+}
+
+// End AvaticaCommonsHttpClientImpl.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClient.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClient.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClient.java
new file mode 100644
index 0000000..eac1b74
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClient.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+/**
+ * An interface which defines how requests are sent to the Avatica server.
+ */
+public interface AvaticaHttpClient {
+
+ /**
+ * Sends a serialized request to the Avatica server.
+ *
+ * @param request The serialized request.
+ * @return The serialized response.
+ */
+ byte[] send(byte[] request);
+
+}
+
+// End AvaticaHttpClient.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java
new file mode 100644
index 0000000..b5d213a
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ConnectionConfig;
+
+import java.net.URL;
+
+/**
+ * A factory for constructing {@link AvaticaHttpClient}'s.
+ */
+public interface AvaticaHttpClientFactory {
+
+ /**
+ * Construct the appropriate implementation of {@link AvaticaHttpClient}.
+ *
+ * @param url URL that the client is for.
+ * @param config Configuration to use when constructing the implementation.
+ * @return An instance of {@link AvaticaHttpClient}.
+ */
+ AvaticaHttpClient getClient(URL url, ConnectionConfig config);
+
+}
+
+// End AvaticaHttpClientFactory.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java
new file mode 100644
index 0000000..cd6cbce
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientFactoryImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ConnectionConfig;
+
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.util.Objects;
+
+/**
+ * Default implementation of {@link AvaticaHttpClientFactory} which chooses an implementation
+ * from a property.
+ */
+public class AvaticaHttpClientFactoryImpl implements AvaticaHttpClientFactory {
+ public static final String HTTP_CLIENT_IMPL_DEFAULT =
+ AvaticaCommonsHttpClientImpl.class.getName();
+
+ // Public for Type.PLUGIN
+ public static final AvaticaHttpClientFactoryImpl INSTANCE = new AvaticaHttpClientFactoryImpl();
+
+ // Public for Type.PLUGIN
+ public AvaticaHttpClientFactoryImpl() {}
+
+ /**
+ * Returns a singleton instance of {@link AvaticaHttpClientFactoryImpl}.
+ *
+ * @return A singleton instance.
+ */
+ public static AvaticaHttpClientFactoryImpl getInstance() {
+ return INSTANCE;
+ }
+
+ @Override public AvaticaHttpClient getClient(URL url, ConnectionConfig config) {
+ String className = config.httpClientClass();
+ if (null == className) {
+ className = HTTP_CLIENT_IMPL_DEFAULT;
+ }
+
+ try {
+ Class<?> clz = Class.forName(className);
+ Constructor<?> constructor = clz.getConstructor(URL.class);
+ Object instance = constructor.newInstance(Objects.requireNonNull(url));
+ return AvaticaHttpClient.class.cast(instance);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to construct AvaticaHttpClient implementation "
+ + className, e);
+ }
+ }
+}
+
+// End AvaticaHttpClientFactoryImpl.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientImpl.java
new file mode 100644
index 0000000..c100eec
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaHttpClientImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+/**
+ * A common class to invoke HTTP requests against the Avatica server agnostic of the data being
+ * sent and received across the wire.
+ */
+public class AvaticaHttpClientImpl implements AvaticaHttpClient {
+ protected final URL url;
+
+ public AvaticaHttpClientImpl(URL url) {
+ this.url = url;
+ }
+
+ public byte[] send(byte[] request) {
+ // TODO back-off policy?
+ while (true) {
+ try {
+ final HttpURLConnection connection = openConnection();
+ connection.setRequestMethod("POST");
+ connection.setDoInput(true);
+ connection.setDoOutput(true);
+ try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream())) {
+ wr.write(request);
+ wr.flush();
+ wr.close();
+ }
+ final int responseCode = connection.getResponseCode();
+ final InputStream inputStream;
+ if (responseCode == HttpURLConnection.HTTP_UNAVAILABLE) {
+ // Could be sitting behind a load-balancer, try again.
+ continue;
+ } else if (responseCode != HttpURLConnection.HTTP_OK) {
+ inputStream = connection.getErrorStream();
+ } else {
+ inputStream = connection.getInputStream();
+ }
+ return AvaticaUtils.readFullyToBytes(inputStream);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ HttpURLConnection openConnection() throws IOException {
+ return (HttpURLConnection) url.openConnection();
+ }
+}
+
+// End AvaticaHttpClientImpl.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionConfigImpl.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionConfigImpl.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionConfigImpl.java
new file mode 100644
index 0000000..d5ae9b1
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionConfigImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ConnectionConfigImpl;
+
+import java.util.Properties;
+
+/** Implementation of {@link org.apache.calcite.avatica.ConnectionConfig}
+ * with extra properties specific to Remote Driver. */
+public class AvaticaRemoteConnectionConfigImpl extends ConnectionConfigImpl {
+ public AvaticaRemoteConnectionConfigImpl(Properties properties) {
+ super(properties);
+ }
+
+ public Service.Factory factory() {
+ return AvaticaRemoteConnectionProperty.FACTORY.wrap(properties)
+ .getPlugin(Service.Factory.class, null);
+ }
+}
+
+// End AvaticaRemoteConnectionConfigImpl.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
new file mode 100644
index 0000000..5e755f8
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ConnectionProperty;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.calcite.avatica.ConnectionConfigImpl.PropEnv;
+import static org.apache.calcite.avatica.ConnectionConfigImpl.parse;
+
+/**
+ * Enumeration of Avatica remote driver's built-in connection properties.
+ */
+public enum AvaticaRemoteConnectionProperty implements ConnectionProperty {
+ /** Factory. */
+ FACTORY("factory", Type.STRING, null);
+
+ private final String camelName;
+ private final Type type;
+ private final Object defaultValue;
+
+ private static final Map<String, AvaticaRemoteConnectionProperty> NAME_TO_PROPS;
+
+ static {
+ NAME_TO_PROPS = new HashMap<String, AvaticaRemoteConnectionProperty>();
+ for (AvaticaRemoteConnectionProperty p
+ : AvaticaRemoteConnectionProperty.values()) {
+ NAME_TO_PROPS.put(p.camelName.toUpperCase(), p);
+ NAME_TO_PROPS.put(p.name(), p);
+ }
+ }
+
+ AvaticaRemoteConnectionProperty(String camelName,
+ Type type,
+ Object defaultValue) {
+ this.camelName = camelName;
+ this.type = type;
+ this.defaultValue = defaultValue;
+ assert defaultValue == null || type.valid(defaultValue);
+ }
+
+ public String camelName() {
+ return camelName;
+ }
+
+ public Object defaultValue() {
+ return defaultValue;
+ }
+
+ public Type type() {
+ return type;
+ }
+
+ public PropEnv wrap(Properties properties) {
+ return new PropEnv(parse(properties, NAME_TO_PROPS), this);
+ }
+
+ public boolean required() {
+ return false;
+ }
+}
+
+// End AvaticaRemoteConnectionProperty.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRuntimeException.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRuntimeException.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRuntimeException.java
new file mode 100644
index 0000000..2f9a1cd
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/AvaticaRuntimeException.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaSeverity;
+import org.apache.calcite.avatica.remote.Service.ErrorResponse;
+
+import java.util.Objects;
+
+/**
+ * A {@link RuntimeException} thrown by Avatica with additional contextual information about
+ * what happened to cause the Exception.
+ */
+public class AvaticaRuntimeException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+ private final String errorMessage;
+ private final int errorCode;
+ private final String sqlState;
+ private final AvaticaSeverity severity;
+
+ /**
+ * Constructs an {@code AvaticaRuntimeException} with no additional information.
+ *
+ * <p>It is strongly preferred that the caller invoke
+ * {@link #AvaticaRuntimeException(String, int, String, AvaticaSeverity)}
+ * with proper contextual information.
+ */
+ public AvaticaRuntimeException() {
+ this("No additional context on exception", ErrorResponse.UNKNOWN_ERROR_CODE,
+ ErrorResponse.UNKNOWN_SQL_STATE, AvaticaSeverity.UNKNOWN);
+ }
+
+ /**
+ * Constructs an {@code AvaticaRuntimeException} with the given
+ * contextual information surrounding the error.
+ *
+ * @param errorMessage A human-readable explanation about what happened
+ * @param errorCode Numeric identifier for error
+ * @param sqlState 5-character identifier for error
+ * @param severity Severity
+ */
+ public AvaticaRuntimeException(String errorMessage, int errorCode, String sqlState,
+ AvaticaSeverity severity) {
+ this.errorMessage = Objects.requireNonNull(errorMessage);
+ this.errorCode = errorCode;
+ this.sqlState = Objects.requireNonNull(sqlState);
+ this.severity = Objects.requireNonNull(severity);
+ }
+
+ /**
+ * Returns a human-readable error message.
+ */
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ /**
+ * Returns a numeric code for this error.
+ */
+ public int getErrorCode() {
+ return errorCode;
+ }
+
+ /**
+ * Returns the five-character identifier for this error.
+ */
+ public String getSqlState() {
+ return sqlState;
+ }
+
+ /**
+ * Returns the severity at which this exception is thrown.
+ */
+ public AvaticaSeverity getSeverity() {
+ return severity;
+ }
+
+ @Override public String toString() {
+ StringBuilder sb = new StringBuilder(64);
+ return sb.append("AvaticaRuntimeException: [")
+ .append("Messsage: '").append(errorMessage).append("', ")
+ .append("Error code: '").append(errorCode).append("', ")
+ .append("SQL State: '").append(sqlState).append("', ")
+ .append("Severity: '").append(severity).append("']").toString();
+ }
+}
+
+// End AvaticaRuntimeException.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java
new file mode 100644
index 0000000..e98c486
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Driver.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.BuiltInConnectionProperty;
+import org.apache.calcite.avatica.ConnectionConfig;
+import org.apache.calcite.avatica.ConnectionProperty;
+import org.apache.calcite.avatica.DriverVersion;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.UnregisteredDriver;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Avatica Remote JDBC driver.
+ */
+public class Driver extends UnregisteredDriver {
+ private static final Logger LOG = LoggerFactory.getLogger(Driver.class);
+ public static final String CONNECT_STRING_PREFIX = "jdbc:avatica:remote:";
+
+ static {
+ new Driver().register();
+ }
+
+ public Driver() {
+ super();
+ }
+
+ /**
+ * Defines the method of message serialization used by the Driver
+ */
+ public static enum Serialization {
+ JSON,
+ PROTOBUF;
+ }
+
+ @Override protected String getConnectStringPrefix() {
+ return CONNECT_STRING_PREFIX;
+ }
+
+ protected DriverVersion createDriverVersion() {
+ return DriverVersion.load(
+ Driver.class,
+ "org-apache-calcite-jdbc.properties",
+ "Avatica Remote JDBC Driver",
+ "unknown version",
+ "Avatica",
+ "unknown version");
+ }
+
+ @Override protected Collection<ConnectionProperty> getConnectionProperties() {
+ final List<ConnectionProperty> list = new ArrayList<ConnectionProperty>();
+ Collections.addAll(list, BuiltInConnectionProperty.values());
+ Collections.addAll(list, AvaticaRemoteConnectionProperty.values());
+ return list;
+ }
+
+ @Override public Meta createMeta(AvaticaConnection connection) {
+ final ConnectionConfig config = connection.config();
+ final Service service = createService(connection, config);
+ return new RemoteMeta(connection, service);
+ }
+
+ /**
+ * Creates a {@link Service} with the given {@link AvaticaConnection} and configuration.
+ *
+ * @param connection The {@link AvaticaConnection} to use.
+ * @param config Configuration properties
+ * @return A Service implementation.
+ */
+ Service createService(AvaticaConnection connection, ConnectionConfig config) {
+ final Service.Factory metaFactory = config.factory();
+ final Service service;
+ if (metaFactory != null) {
+ service = metaFactory.create(connection);
+ } else if (config.url() != null) {
+ final AvaticaHttpClient httpClient = getHttpClient(connection, config);
+ final Serialization serializationType = getSerialization(config);
+
+ LOG.debug("Instantiating {} service", serializationType);
+ switch (serializationType) {
+ case JSON:
+ service = new RemoteService(httpClient);
+ break;
+ case PROTOBUF:
+ service = new RemoteProtobufService(httpClient, new ProtobufTranslationImpl());
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled serialization type: " + serializationType);
+ }
+ } else {
+ service = new MockJsonService(Collections.<String, String>emptyMap());
+ }
+ return service;
+ }
+
+ /**
+ * Creates the HTTP client that communicates with the Avatica server.
+ *
+ * @param connection The {@link AvaticaConnection}.
+ * @param config The configuration.
+ * @return An {@link AvaticaHttpClient} implementation.
+ */
+ AvaticaHttpClient getHttpClient(AvaticaConnection connection, ConnectionConfig config) {
+ URL url;
+ try {
+ url = new URL(config.url());
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+
+ AvaticaHttpClientFactory httpClientFactory = config.httpClientFactory();
+
+ return httpClientFactory.getClient(url, config);
+ }
+
+ @Override public Connection connect(String url, Properties info)
+ throws SQLException {
+ AvaticaConnection conn = (AvaticaConnection) super.connect(url, info);
+ if (conn == null) {
+ // It's not an url for our driver
+ return null;
+ }
+
+ // Create the corresponding remote connection
+ ConnectionConfig config = conn.config();
+ Service service = createService(conn, config);
+
+ service.apply(
+ new Service.OpenConnectionRequest(conn.id,
+ Service.OpenConnectionRequest.serializeProperties(info)));
+
+ return conn;
+ }
+
+ Serialization getSerialization(ConnectionConfig config) {
+ final String serializationStr = config.serialization();
+ Serialization serializationType = Serialization.JSON;
+ if (null != serializationStr) {
+ try {
+ serializationType = Serialization.valueOf(serializationStr.toUpperCase());
+ } catch (Exception e) {
+ // Log a warning instead of failing harshly? Intentionally no loggers available?
+ throw new RuntimeException(e);
+ }
+ }
+
+ return serializationType;
+ }
+}
+
+// End Driver.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Handler.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Handler.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Handler.java
new file mode 100644
index 0000000..30d026c
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Handler.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+
+import java.util.Objects;
+
+/**
+ * API for text request-response calls to an Avatica server.
+ *
+ * @param <T> The type this handler accepts and returns
+ */
+public interface Handler<T> {
+ int HTTP_OK = 200;
+ int HTTP_INTERNAL_SERVER_ERROR = 500;
+ String HANDLER_SERIALIZATION_METRICS_NAME = "Handler.Serialization";
+
+ /**
+ * Struct that encapsulates the context of the result of a request to Avatica.
+ */
+ public class HandlerResponse<T> {
+ private final T response;
+ private final int statusCode;
+
+ public HandlerResponse(T response, int statusCode) {
+ this.response = Objects.requireNonNull(response);
+ this.statusCode = statusCode;
+ }
+
+ public T getResponse() {
+ return response;
+ }
+
+ public int getStatusCode() {
+ return statusCode;
+ }
+
+ @Override public String toString() {
+ return "Response: " + response + ", Status:" + statusCode;
+ }
+ }
+
+ HandlerResponse<T> apply(T request);
+
+ /**
+ * Sets some general server information to return to the client in all responses.
+ *
+ * @param metadata Server-wide information
+ */
+ void setRpcMetadata(RpcMetadataResponse metadata);
+}
+
+// End Handler.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
new file mode 100644
index 0000000..fd57078
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+import org.apache.calcite.avatica.metrics.Timer;
+import org.apache.calcite.avatica.metrics.Timer.Context;
+import org.apache.calcite.avatica.remote.Service.Request;
+import org.apache.calcite.avatica.remote.Service.Response;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * Implementation of {@link org.apache.calcite.avatica.remote.Handler}
+ * that decodes JSON requests, sends them to a {@link Service},
+ * and encodes the responses into JSON.
+ *
+ * @see org.apache.calcite.avatica.remote.JsonService
+ */
+public class JsonHandler extends AbstractHandler<String> {
+
+ protected static final ObjectMapper MAPPER = JsonService.MAPPER;
+
+ final MetricsSystem metrics;
+ final Timer serializationTimer;
+
+ public JsonHandler(Service service, MetricsSystem metrics) {
+ super(service);
+ this.metrics = metrics;
+ this.serializationTimer = this.metrics.getTimer(
+ MetricsHelper.concat(JsonHandler.class, HANDLER_SERIALIZATION_METRICS_NAME));
+ }
+
+ public HandlerResponse<String> apply(String jsonRequest) {
+ return super.apply(jsonRequest);
+ }
+
+ @Override Request decode(String request) throws IOException {
+ try (final Context ctx = serializationTimer.start()) {
+ return MAPPER.readValue(request, Service.Request.class);
+ }
+ }
+
+ /**
+ * Serializes the provided object as JSON.
+ *
+ * @param response The object to serialize.
+ * @return A JSON string.
+ */
+ @Override String encode(Response response) throws IOException {
+ try (final Context ctx = serializationTimer.start()) {
+ final StringWriter w = new StringWriter();
+ MAPPER.writeValue(w, response);
+ return w.toString();
+ }
+ }
+}
+
+// End JsonHandler.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
new file mode 100644
index 0000000..668b3be
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * Implementation of {@link org.apache.calcite.avatica.remote.Service}
+ * that encodes requests and responses as JSON.
+ */
+public abstract class JsonService extends AbstractService {
+ public static final ObjectMapper MAPPER;
+ static {
+ MAPPER = new ObjectMapper();
+ MAPPER.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+ MAPPER.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
+ MAPPER.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true);
+ }
+
+ public JsonService() {
+ }
+
+ /** Derived class should implement this method to transport requests and
+ * responses to and from the peer service. */
+ public abstract String apply(String request);
+
+ @Override SerializationType getSerializationType() {
+ return SerializationType.JSON;
+ }
+
+ //@VisibleForTesting
+ protected static <T> T decode(String response, Class<T> expectedType)
+ throws IOException {
+ Response resp = MAPPER.readValue(response, Response.class);
+ if (resp instanceof ErrorResponse) {
+ throw ((ErrorResponse) resp).toException();
+ } else if (!expectedType.isAssignableFrom(resp.getClass())) {
+ throw new ClassCastException("Cannot cast " + resp.getClass() + " into " + expectedType);
+ }
+
+ return expectedType.cast(resp);
+ }
+
+ //@VisibleForTesting
+ protected static <T> String encode(T request) throws IOException {
+ final StringWriter w = new StringWriter();
+ MAPPER.writeValue(w, request);
+ return w.toString();
+ }
+
+ protected RuntimeException handle(IOException e) {
+ return new RuntimeException(e);
+ }
+
+ public ResultSetResponse apply(CatalogsRequest request) {
+ try {
+ return finagle(decode(apply(encode(request)), ResultSetResponse.class));
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public ResultSetResponse apply(SchemasRequest request) {
+ try {
+ return finagle(decode(apply(encode(request)), ResultSetResponse.class));
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public ResultSetResponse apply(TablesRequest request) {
+ try {
+ return finagle(decode(apply(encode(request)), ResultSetResponse.class));
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public ResultSetResponse apply(TableTypesRequest request) {
+ try {
+ return finagle(decode(apply(encode(request)), ResultSetResponse.class));
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public ResultSetResponse apply(TypeInfoRequest request) {
+ try {
+ return finagle(decode(apply(encode(request)), ResultSetResponse.class));
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public ResultSetResponse apply(ColumnsRequest request) {
+ try {
+ return finagle(decode(apply(encode(request)), ResultSetResponse.class));
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public PrepareResponse apply(PrepareRequest request) {
+ try {
+ return finagle(decode(apply(encode(request)), PrepareResponse.class));
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public ExecuteResponse apply(PrepareAndExecuteRequest request) {
+ try {
+ return finagle(decode(apply(encode(request)), ExecuteResponse.class));
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public FetchResponse apply(FetchRequest request) {
+ try {
+ return decode(apply(encode(request)), FetchResponse.class);
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public ExecuteResponse apply(ExecuteRequest request) {
+ try {
+ return finagle(decode(apply(encode(request)), ExecuteResponse.class));
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public CreateStatementResponse apply(CreateStatementRequest request) {
+ try {
+ return decode(apply(encode(request)), CreateStatementResponse.class);
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public CloseStatementResponse apply(CloseStatementRequest request) {
+ try {
+ return decode(apply(encode(request)), CloseStatementResponse.class);
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public OpenConnectionResponse apply(OpenConnectionRequest request) {
+ try {
+ return decode(apply(encode(request)), OpenConnectionResponse.class);
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public CloseConnectionResponse apply(CloseConnectionRequest request) {
+ try {
+ return decode(apply(encode(request)), CloseConnectionResponse.class);
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public ConnectionSyncResponse apply(ConnectionSyncRequest request) {
+ try {
+ return decode(apply(encode(request)), ConnectionSyncResponse.class);
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public DatabasePropertyResponse apply(DatabasePropertyRequest request) {
+ try {
+ return decode(apply(encode(request)), DatabasePropertyResponse.class);
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public SyncResultsResponse apply(SyncResultsRequest request) {
+ try {
+ return decode(apply(encode(request)), SyncResultsResponse.class);
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public CommitResponse apply(CommitRequest request) {
+ try {
+ return decode(apply(encode(request)), CommitResponse.class);
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+
+ public RollbackResponse apply(RollbackRequest request) {
+ try {
+ return decode(apply(encode(request)), RollbackResponse.class);
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+}
+
+// End JsonService.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java
new file mode 100644
index 0000000..0af6300
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * Implementation of {@link org.apache.calcite.avatica.remote.Service}
+ * that goes to an in-process instance of {@code Service}.
+ */
+public class LocalJsonService extends JsonService {
+ private final Service service;
+
+ public LocalJsonService(Service service) {
+ this.service = service;
+ }
+
+ @Override public String apply(String request) {
+ try {
+ Request request2 = MAPPER.readValue(request, Request.class);
+ Response response2 = request2.accept(service);
+ final StringWriter w = new StringWriter();
+ MAPPER.writeValue(w, response2);
+ return w.toString();
+ } catch (IOException e) {
+ throw handle(e);
+ }
+ }
+}
+
+// End LocalJsonService.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java
new file mode 100644
index 0000000..76e2392
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import java.io.IOException;
+
+/**
+ * A Service implementation that performs protocol buffer serialization on request and responses
+ * on either side of computing a response from a request to mimic some transport to a server which
+ * would normally perform such computation.
+ */
+public class LocalProtobufService extends ProtobufService {
+ private final Service service;
+ private final ProtobufTranslation translation;
+
+ public LocalProtobufService(Service service, ProtobufTranslation translation) {
+ this.service = service;
+ this.translation = translation;
+ }
+
+ @Override public Response _apply(Request request) {
+ try {
+ // Serialize the request to "send to the server"
+ byte[] serializedRequest = translation.serializeRequest(request);
+
+ // *some transport would normally happen here*
+
+ // Fake deserializing that request somewhere else
+ Request request2 = translation.parseRequest(serializedRequest);
+
+ // Serialize the response from the service to "send to the client"
+ byte[] serializedResponse = translation.serializeResponse(request2.accept(service));
+
+ // *some transport would normally happen here*
+
+ // Deserialize the response on "the client"
+ return translation.parseResponse(serializedResponse);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
+
+// End LocalProtobufService.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
new file mode 100644
index 0000000..c070ec0
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.Meta;
+
+import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.MissingResultsException;
+import org.apache.calcite.avatica.NoSuchStatementException;
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+import org.apache.calcite.avatica.metrics.Timer;
+import org.apache.calcite.avatica.metrics.Timer.Context;
+import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.calcite.avatica.remote.MetricsHelper.concat;
+
+/**
+ * Implementation of {@link Service} that talks to a local {@link Meta}.
+ */
+public class LocalService implements Service {
+ final Meta meta;
+ final MetricsSystem metrics;
+
+ private final Timer executeTimer;
+ private final Timer commitTimer;
+ private final Timer prepareTimer;
+ private final Timer prepareAndExecuteTimer;
+ private final Timer connectionSyncTimer;
+
+ private RpcMetadataResponse serverLevelRpcMetadata;
+
+ public LocalService(Meta meta) {
+ this(meta, NoopMetricsSystem.getInstance());
+ }
+
+ public LocalService(Meta meta, MetricsSystem metrics) {
+ this.meta = meta;
+ this.metrics = Objects.requireNonNull(metrics);
+
+ this.executeTimer = this.metrics.getTimer(name("Execute"));
+ this.commitTimer = this.metrics.getTimer(name("Commit"));
+ this.prepareTimer = this.metrics.getTimer(name("Prepare"));
+ this.prepareAndExecuteTimer = this.metrics.getTimer(name("PrepareAndExecute"));
+ this.connectionSyncTimer = this.metrics.getTimer(name("ConnectionSync"));
+ }
+
+ private static String name(String timer) {
+ return concat(LocalService.class, timer);
+ }
+
+ @Override public void setRpcMetadata(RpcMetadataResponse serverLevelRpcMetadata) {
+ this.serverLevelRpcMetadata = Objects.requireNonNull(serverLevelRpcMetadata);
+ }
+
+ private static <E> List<E> list(Iterable<E> iterable) {
+ if (iterable instanceof List) {
+ return (List<E>) iterable;
+ }
+ final List<E> rowList = new ArrayList<>();
+ for (E row : iterable) {
+ rowList.add(row);
+ }
+ return rowList;
+ }
+
+ /** Converts a result set (not serializable) into a serializable response. */
+ public ResultSetResponse toResponse(Meta.MetaResultSet resultSet) {
+ if (resultSet.updateCount != -1) {
+ return new ResultSetResponse(resultSet.connectionId,
+ resultSet.statementId, resultSet.ownStatement, null, null,
+ resultSet.updateCount, serverLevelRpcMetadata);
+ }
+
+ Meta.Signature signature = resultSet.signature;
+ Meta.CursorFactory cursorFactory = resultSet.signature.cursorFactory;
+ Meta.Frame frame = null;
+ int updateCount = -1;
+ final List<Object> list;
+
+ if (resultSet.firstFrame != null) {
+ list = list(resultSet.firstFrame.rows);
+ switch (cursorFactory.style) {
+ case ARRAY:
+ cursorFactory = Meta.CursorFactory.LIST;
+ break;
+ case MAP:
+ case LIST:
+ break;
+ case RECORD:
+ cursorFactory = Meta.CursorFactory.LIST;
+ break;
+ default:
+ cursorFactory = Meta.CursorFactory.map(cursorFactory.fieldNames);
+ }
+
+ final boolean done = resultSet.firstFrame.done;
+
+ frame = new Meta.Frame(0, done, list);
+ updateCount = -1;
+
+ if (signature.statementType != null) {
+ if (signature.statementType.canUpdate()) {
+ frame = null;
+ updateCount = ((Number) ((List) list.get(0)).get(0)).intValue();
+ }
+ }
+ } else {
+ //noinspection unchecked
+ list = (List<Object>) (List) list2(resultSet);
+ cursorFactory = Meta.CursorFactory.LIST;
+ }
+
+ if (cursorFactory != resultSet.signature.cursorFactory) {
+ signature = signature.setCursorFactory(cursorFactory);
+ }
+
+ return new ResultSetResponse(resultSet.connectionId, resultSet.statementId,
+ resultSet.ownStatement, signature, frame, updateCount, serverLevelRpcMetadata);
+ }
+
+ private List<List<Object>> list2(Meta.MetaResultSet resultSet) {
+ final Meta.StatementHandle h = new Meta.StatementHandle(
+ resultSet.connectionId, resultSet.statementId, null);
+ final List<TypedValue> parameterValues = Collections.emptyList();
+ final Iterable<Object> iterable = meta.createIterable(h, null,
+ resultSet.signature, parameterValues, resultSet.firstFrame);
+ final List<List<Object>> list = new ArrayList<>();
+ return MetaImpl.collect(resultSet.signature.cursorFactory, iterable, list);
+ }
+
+ public ResultSetResponse apply(CatalogsRequest request) {
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
+ final Meta.MetaResultSet resultSet = meta.getCatalogs(ch);
+ return toResponse(resultSet);
+ }
+
+ public ResultSetResponse apply(SchemasRequest request) {
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
+ final Meta.MetaResultSet resultSet =
+ meta.getSchemas(ch, request.catalog, Meta.Pat.of(request.schemaPattern));
+ return toResponse(resultSet);
+ }
+
+ public ResultSetResponse apply(TablesRequest request) {
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
+ final Meta.MetaResultSet resultSet =
+ meta.getTables(ch,
+ request.catalog,
+ Meta.Pat.of(request.schemaPattern),
+ Meta.Pat.of(request.tableNamePattern),
+ request.typeList);
+ return toResponse(resultSet);
+ }
+
+ public ResultSetResponse apply(TableTypesRequest request) {
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
+ final Meta.MetaResultSet resultSet = meta.getTableTypes(ch);
+ return toResponse(resultSet);
+ }
+
+ public ResultSetResponse apply(TypeInfoRequest request) {
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
+ final Meta.MetaResultSet resultSet = meta.getTypeInfo(ch);
+ return toResponse(resultSet);
+ }
+
+ public ResultSetResponse apply(ColumnsRequest request) {
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
+ final Meta.MetaResultSet resultSet =
+ meta.getColumns(ch,
+ request.catalog,
+ Meta.Pat.of(request.schemaPattern),
+ Meta.Pat.of(request.tableNamePattern),
+ Meta.Pat.of(request.columnNamePattern));
+ return toResponse(resultSet);
+ }
+
+ public PrepareResponse apply(PrepareRequest request) {
+ try (final Context ctx = prepareTimer.start()) {
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
+ final Meta.StatementHandle h =
+ meta.prepare(ch, request.sql, request.maxRowCount);
+ return new PrepareResponse(h, serverLevelRpcMetadata);
+ }
+ }
+
+ public ExecuteResponse apply(PrepareAndExecuteRequest request) {
+ try (final Context ctx = prepareAndExecuteTimer.start()) {
+ final Meta.StatementHandle sh =
+ new Meta.StatementHandle(request.connectionId, request.statementId, null);
+ try {
+ final Meta.ExecuteResult executeResult =
+ meta.prepareAndExecute(sh, request.sql, request.maxRowCount,
+ new Meta.PrepareCallback() {
+ @Override public Object getMonitor() {
+ return LocalService.class;
+ }
+
+ @Override public void clear() {
+ }
+
+ @Override public void assign(Meta.Signature signature,
+ Meta.Frame firstFrame, long updateCount) {
+ }
+
+ @Override public void execute() {
+ }
+ });
+ final List<ResultSetResponse> results = new ArrayList<>();
+ for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) {
+ results.add(toResponse(metaResultSet));
+ }
+ return new ExecuteResponse(results, false, serverLevelRpcMetadata);
+ } catch (NoSuchStatementException e) {
+ // The Statement doesn't exist anymore, bubble up this information
+ return new ExecuteResponse(null, true, serverLevelRpcMetadata);
+ }
+ }
+ }
+
+ public FetchResponse apply(FetchRequest request) {
+ final Meta.StatementHandle h = new Meta.StatementHandle(
+ request.connectionId, request.statementId, null);
+ try {
+ final Meta.Frame frame =
+ meta.fetch(h,
+ request.offset,
+ request.fetchMaxRowCount);
+ return new FetchResponse(frame, false, false, serverLevelRpcMetadata);
+ } catch (NullPointerException | NoSuchStatementException e) {
+ // The Statement doesn't exist anymore, bubble up this information
+ return new FetchResponse(null, true, true, serverLevelRpcMetadata);
+ } catch (MissingResultsException e) {
+ return new FetchResponse(null, false, true, serverLevelRpcMetadata);
+ }
+ }
+
+ public ExecuteResponse apply(ExecuteRequest request) {
+ try (final Context ctx = executeTimer.start()) {
+ try {
+ final Meta.ExecuteResult executeResult = meta.execute(request.statementHandle,
+ request.parameterValues, request.maxRowCount);
+
+ final List<ResultSetResponse> results = new ArrayList<>(executeResult.resultSets.size());
+ for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) {
+ results.add(toResponse(metaResultSet));
+ }
+ return new ExecuteResponse(results, false, serverLevelRpcMetadata);
+ } catch (NoSuchStatementException e) {
+ return new ExecuteResponse(null, true, serverLevelRpcMetadata);
+ }
+ }
+ }
+
+ public CreateStatementResponse apply(CreateStatementRequest request) {
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
+ final Meta.StatementHandle h = meta.createStatement(ch);
+ return new CreateStatementResponse(h.connectionId, h.id, serverLevelRpcMetadata);
+ }
+
+ public CloseStatementResponse apply(CloseStatementRequest request) {
+ final Meta.StatementHandle h = new Meta.StatementHandle(
+ request.connectionId, request.statementId, null);
+ meta.closeStatement(h);
+ return new CloseStatementResponse(serverLevelRpcMetadata);
+ }
+
+ public OpenConnectionResponse apply(OpenConnectionRequest request) {
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
+ meta.openConnection(ch, request.info);
+ return new OpenConnectionResponse(serverLevelRpcMetadata);
+ }
+
+ public CloseConnectionResponse apply(CloseConnectionRequest request) {
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
+ meta.closeConnection(ch);
+ return new CloseConnectionResponse(serverLevelRpcMetadata);
+ }
+
+ public ConnectionSyncResponse apply(ConnectionSyncRequest request) {
+ try (final Context ctx = connectionSyncTimer.start()) {
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
+ final Meta.ConnectionProperties connProps =
+ meta.connectionSync(ch, request.connProps);
+ return new ConnectionSyncResponse(connProps, serverLevelRpcMetadata);
+ }
+ }
+
+ public DatabasePropertyResponse apply(DatabasePropertyRequest request) {
+ final Meta.ConnectionHandle ch =
+ new Meta.ConnectionHandle(request.connectionId);
+ return new DatabasePropertyResponse(meta.getDatabaseProperties(ch), serverLevelRpcMetadata);
+ }
+
+ public SyncResultsResponse apply(SyncResultsRequest request) {
+ final Meta.StatementHandle h = new Meta.StatementHandle(
+ request.connectionId, request.statementId, null);
+ SyncResultsResponse response;
+ try {
+ // Set success on the cached statement
+ response = new SyncResultsResponse(meta.syncResults(h, request.state, request.offset), false,
+ serverLevelRpcMetadata);
+ } catch (NoSuchStatementException e) {
+ // Tried to sync results on a statement which wasn't cached
+ response = new SyncResultsResponse(false, true, serverLevelRpcMetadata);
+ }
+
+ return response;
+ }
+
+ public CommitResponse apply(CommitRequest request) {
+ try (final Context ctx = commitTimer.start()) {
+ meta.commit(new Meta.ConnectionHandle(request.connectionId));
+
+ // If commit() errors, let the ErrorResponse be sent back via an uncaught Exception.
+ return new CommitResponse();
+ }
+ }
+
+ public RollbackResponse apply(RollbackRequest request) {
+ meta.rollback(new Meta.ConnectionHandle(request.connectionId));
+
+ // If rollback() errors, let the ErrorResponse be sent back via an uncaught Exception.
+ return new RollbackResponse();
+ }
+}
+
+// End LocalService.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java
new file mode 100644
index 0000000..12a5b59
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.proto.Common;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+
+/**
+ * Identifies an operation from {@link DatabaseMetaData} which returns a {@link ResultSet}. This
+ * enum is used to allow clients to request the server to re-instantiate a {@link ResultSet} for
+ * these operations which do not have a SQL string associated with them as a normal query does.
+ */
+public enum MetaDataOperation {
+ GET_ATTRIBUTES,
+ GET_BEST_ROW_IDENTIFIER,
+ GET_CATALOGS,
+ GET_CLIENT_INFO_PROPERTIES,
+ GET_COLUMN_PRIVILEGES,
+ GET_COLUMNS,
+ GET_CROSS_REFERENCE,
+ GET_EXPORTED_KEYS,
+ GET_FUNCTION_COLUMNS,
+ GET_FUNCTIONS,
+ GET_IMPORTED_KEYS,
+ GET_INDEX_INFO,
+ GET_PRIMARY_KEYS,
+ GET_PROCEDURE_COLUMNS,
+ GET_PROCEDURES,
+ GET_PSEUDO_COLUMNS,
+ GET_SCHEMAS,
+ GET_SCHEMAS_WITH_ARGS,
+ GET_SUPER_TABLES,
+ GET_SUPER_TYPES,
+ GET_TABLE_PRIVILEGES,
+ GET_TABLES,
+ GET_TABLE_TYPES,
+ GET_TYPE_INFO,
+ GET_UDTS,
+ GET_VERSION_COLUMNS;
+
+ public Common.MetaDataOperation toProto() {
+ switch (this) {
+ case GET_ATTRIBUTES:
+ return Common.MetaDataOperation.GET_ATTRIBUTES;
+ case GET_BEST_ROW_IDENTIFIER:
+ return Common.MetaDataOperation.GET_BEST_ROW_IDENTIFIER;
+ case GET_CATALOGS:
+ return Common.MetaDataOperation.GET_CATALOGS;
+ case GET_CLIENT_INFO_PROPERTIES:
+ return Common.MetaDataOperation.GET_CLIENT_INFO_PROPERTIES;
+ case GET_COLUMNS:
+ return Common.MetaDataOperation.GET_COLUMNS;
+ case GET_COLUMN_PRIVILEGES:
+ return Common.MetaDataOperation.GET_COLUMN_PRIVILEGES;
+ case GET_CROSS_REFERENCE:
+ return Common.MetaDataOperation.GET_CROSS_REFERENCE;
+ case GET_EXPORTED_KEYS:
+ return Common.MetaDataOperation.GET_EXPORTED_KEYS;
+ case GET_FUNCTIONS:
+ return Common.MetaDataOperation.GET_FUNCTIONS;
+ case GET_FUNCTION_COLUMNS:
+ return Common.MetaDataOperation.GET_FUNCTION_COLUMNS;
+ case GET_IMPORTED_KEYS:
+ return Common.MetaDataOperation.GET_IMPORTED_KEYS;
+ case GET_INDEX_INFO:
+ return Common.MetaDataOperation.GET_INDEX_INFO;
+ case GET_PRIMARY_KEYS:
+ return Common.MetaDataOperation.GET_PRIMARY_KEYS;
+ case GET_PROCEDURES:
+ return Common.MetaDataOperation.GET_PROCEDURES;
+ case GET_PROCEDURE_COLUMNS:
+ return Common.MetaDataOperation.GET_PROCEDURE_COLUMNS;
+ case GET_PSEUDO_COLUMNS:
+ return Common.MetaDataOperation.GET_PSEUDO_COLUMNS;
+ case GET_SCHEMAS:
+ return Common.MetaDataOperation.GET_SCHEMAS;
+ case GET_SCHEMAS_WITH_ARGS:
+ return Common.MetaDataOperation.GET_SCHEMAS_WITH_ARGS;
+ case GET_SUPER_TABLES:
+ return Common.MetaDataOperation.GET_SUPER_TABLES;
+ case GET_SUPER_TYPES:
+ return Common.MetaDataOperation.GET_SUPER_TYPES;
+ case GET_TABLES:
+ return Common.MetaDataOperation.GET_TABLES;
+ case GET_TABLE_PRIVILEGES:
+ return Common.MetaDataOperation.GET_TABLE_PRIVILEGES;
+ case GET_TABLE_TYPES:
+ return Common.MetaDataOperation.GET_TABLE_TYPES;
+ case GET_TYPE_INFO:
+ return Common.MetaDataOperation.GET_TYPE_INFO;
+ case GET_UDTS:
+ return Common.MetaDataOperation.GET_UDTS;
+ case GET_VERSION_COLUMNS:
+ return Common.MetaDataOperation.GET_VERSION_COLUMNS;
+ default:
+ throw new RuntimeException("Unknown type: " + this);
+ }
+ }
+
+ public static MetaDataOperation fromProto(Common.MetaDataOperation protoOp) {
+ // Null is acceptable
+ if (null == protoOp) {
+ return null;
+ }
+
+ switch (protoOp) {
+ case GET_ATTRIBUTES:
+ return MetaDataOperation.GET_ATTRIBUTES;
+ case GET_BEST_ROW_IDENTIFIER:
+ return MetaDataOperation.GET_BEST_ROW_IDENTIFIER;
+ case GET_CATALOGS:
+ return MetaDataOperation.GET_CATALOGS;
+ case GET_CLIENT_INFO_PROPERTIES:
+ return MetaDataOperation.GET_CLIENT_INFO_PROPERTIES;
+ case GET_COLUMNS:
+ return MetaDataOperation.GET_COLUMNS;
+ case GET_COLUMN_PRIVILEGES:
+ return MetaDataOperation.GET_COLUMN_PRIVILEGES;
+ case GET_CROSS_REFERENCE:
+ return MetaDataOperation.GET_CROSS_REFERENCE;
+ case GET_EXPORTED_KEYS:
+ return MetaDataOperation.GET_EXPORTED_KEYS;
+ case GET_FUNCTIONS:
+ return MetaDataOperation.GET_FUNCTIONS;
+ case GET_FUNCTION_COLUMNS:
+ return MetaDataOperation.GET_FUNCTION_COLUMNS;
+ case GET_IMPORTED_KEYS:
+ return MetaDataOperation.GET_IMPORTED_KEYS;
+ case GET_INDEX_INFO:
+ return MetaDataOperation.GET_INDEX_INFO;
+ case GET_PRIMARY_KEYS:
+ return MetaDataOperation.GET_PRIMARY_KEYS;
+ case GET_PROCEDURES:
+ return MetaDataOperation.GET_PROCEDURES;
+ case GET_PROCEDURE_COLUMNS:
+ return MetaDataOperation.GET_PROCEDURE_COLUMNS;
+ case GET_PSEUDO_COLUMNS:
+ return MetaDataOperation.GET_PSEUDO_COLUMNS;
+ case GET_SCHEMAS:
+ return MetaDataOperation.GET_SCHEMAS;
+ case GET_SCHEMAS_WITH_ARGS:
+ return MetaDataOperation.GET_SCHEMAS_WITH_ARGS;
+ case GET_SUPER_TABLES:
+ return MetaDataOperation.GET_SUPER_TABLES;
+ case GET_SUPER_TYPES:
+ return MetaDataOperation.GET_SUPER_TYPES;
+ case GET_TABLES:
+ return MetaDataOperation.GET_TABLES;
+ case GET_TABLE_PRIVILEGES:
+ return MetaDataOperation.GET_TABLE_PRIVILEGES;
+ case GET_TABLE_TYPES:
+ return MetaDataOperation.GET_TABLE_TYPES;
+ case GET_TYPE_INFO:
+ return MetaDataOperation.GET_TYPE_INFO;
+ case GET_UDTS:
+ return MetaDataOperation.GET_UDTS;
+ case GET_VERSION_COLUMNS:
+ return MetaDataOperation.GET_VERSION_COLUMNS;
+ default:
+ throw new RuntimeException("Unknown type: " + protoOp);
+ }
+ }
+}
+
+// End MetaDataOperation.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java
new file mode 100644
index 0000000..2561b29
--- /dev/null
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+/**
+ * A utility class to encapsulate common logic in use of metrics implementation.
+ */
+public class MetricsHelper {
+
+ private static final String PERIOD = ".";
+
+ private MetricsHelper() {}
+
+ public static String concat(Class<?> clz, String name) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(clz.getName());
+ return sb.append(PERIOD).append(name).toString();
+ }
+
+}
+
+// End MetricsHelper.java