You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by el...@apache.org on 2016/03/07 19:28:04 UTC
[26/59] [partial] calcite git commit: [CALCITE-1078] Detach avatica
from the core calcite Maven project
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java
new file mode 100644
index 0000000..ff27d05
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Statement;
+import java.util.Objects;
+
+/**
+ * All we know about a statement. Encapsulates a {@link ResultSet}.
+ */
+public class StatementInfo {
+ private volatile Boolean relativeSupported = null;
+
+ final Statement statement; // sometimes a PreparedStatement
+ private ResultSet resultSet;
+ private long position = 0;
+
+ // True when setResultSet(ResultSet) is called to let us determine the difference between
+ // a null ResultSet (from an update) from the lack of a ResultSet.
+ private boolean resultsInitialized = false;
+
+ public StatementInfo(Statement statement) {
+ this.statement = Objects.requireNonNull(statement);
+ }
+
+ // Visible for testing
+ void setPosition(long position) {
+ this.position = position;
+ }
+
+ // Visible for testing
+ long getPosition() {
+ return this.position;
+ }
+
+ /**
+ * Set a ResultSet on this object.
+ *
+ * @param resultSet The current ResultSet
+ */
+ public void setResultSet(ResultSet resultSet) {
+ resultsInitialized = true;
+ this.resultSet = resultSet;
+ }
+
+ /**
+ * @return The {@link ResultSet} for this Statement, may be null.
+ */
+ public ResultSet getResultSet() {
+ return this.resultSet;
+ }
+
+ /**
+ * @return True if {@link #setResultSet(ResultSet)} was ever invoked.
+ */
+ public boolean isResultSetInitialized() {
+ return resultsInitialized;
+ }
+
+ /**
+ * @see ResultSet#next()
+ */
+ public boolean next() throws SQLException {
+ return _next(resultSet);
+ }
+
+ boolean _next(ResultSet results) throws SQLException {
+ boolean ret = results.next();
+ position++;
+ return ret;
+ }
+
+ /**
+ * Consumes <code>offset - position</code> elements from the {@link ResultSet}.
+ *
+ * @param offset The offset to advance to
+ * @return True if the resultSet was advanced to the current point, false if insufficient rows
+ * were present to advance to the requested offset.
+ */
+ public boolean advanceResultSetToOffset(ResultSet results, long offset) throws SQLException {
+ if (offset < 0 || offset < position) {
+ throw new IllegalArgumentException("Offset should be "
+ + " non-negative and not less than the current position. " + offset + ", " + position);
+ }
+ if (position >= offset) {
+ return true;
+ }
+
+ if (null == relativeSupported) {
+ Boolean moreResults = null;
+ synchronized (this) {
+ if (null == relativeSupported) {
+ try {
+ moreResults = advanceByRelative(results, offset);
+ relativeSupported = true;
+ } catch (SQLFeatureNotSupportedException e) {
+ relativeSupported = false;
+ }
+ }
+ }
+
+ if (null != moreResults) {
+ // We figured out whether or not relative is supported.
+ // Make sure we actually do the necessary work.
+ if (!relativeSupported) {
+ // We avoided calling advanceByNext in the synchronized block earlier.
+ moreResults = advanceByNext(results, offset);
+ }
+
+ return moreResults;
+ }
+
+ // Another thread updated the RELATIVE_SUPPORTED before we did, fall through.
+ }
+
+ if (relativeSupported) {
+ return advanceByRelative(results, offset);
+ } else {
+ return advanceByNext(results, offset);
+ }
+ }
+
+ private boolean advanceByRelative(ResultSet results, long offset) throws SQLException {
+ long diff = offset - position;
+ while (diff > Integer.MAX_VALUE) {
+ if (!results.relative(Integer.MAX_VALUE)) {
+ // Avoid updating position until relative succeeds.
+ position += Integer.MAX_VALUE;
+ return false;
+ }
+ // Avoid updating position until relative succeeds.
+ position += Integer.MAX_VALUE;
+ diff -= Integer.MAX_VALUE;
+ }
+ boolean ret = results.relative((int) diff);
+ // Make sure we only update the position after successfully calling relative(int).
+ position += diff;
+ return ret;
+ }
+
+ private boolean advanceByNext(ResultSet results, long offset) throws SQLException {
+ while (position < offset) {
+ // Advance while maintaining `position`
+ if (!_next(results)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
+
+// End StatementInfo.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java
new file mode 100644
index 0000000..8b8fb76
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Implements an Avatica provider on top of an existing JDBC data source. */
+package org.apache.calcite.avatica.jdbc;
+
+
+// End package-info.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
new file mode 100644
index 0000000..42b13c9
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+
+import org.eclipse.jetty.server.Handler;
+
+/**
+ * A custom interface that extends the Jetty interface to enable extra control within Avatica.
+ */
+public interface AvaticaHandler extends Handler {
+
+ void setServerRpcMetadata(RpcMetadataResponse metadata);
+
+}
+
+// End AvaticaHandler.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
new file mode 100644
index 0000000..34a9333
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.AvaticaUtils;
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+import org.apache.calcite.avatica.metrics.Timer;
+import org.apache.calcite.avatica.metrics.Timer.Context;
+import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem;
+import org.apache.calcite.avatica.remote.Handler.HandlerResponse;
+import org.apache.calcite.avatica.remote.JsonHandler;
+import org.apache.calcite.avatica.remote.Service;
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
+
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.calcite.avatica.remote.MetricsHelper.concat;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * Jetty handler that executes Avatica JSON request-responses.
+ */
+public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareAvaticaHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(AvaticaJsonHandler.class);
+
+ final Service service;
+ final JsonHandler jsonHandler;
+
+ final MetricsSystem metrics;
+ final Timer requestTimer;
+
+ final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer;
+
+ public AvaticaJsonHandler(Service service) {
+ this(service, NoopMetricsSystem.getInstance());
+ }
+
+ public AvaticaJsonHandler(Service service, MetricsSystem metrics) {
+ this.service = Objects.requireNonNull(service);
+ this.metrics = Objects.requireNonNull(metrics);
+ // Avatica doesn't have a Guava dependency
+ this.jsonHandler = new JsonHandler(service, this.metrics);
+
+ // Metrics
+ this.requestTimer = this.metrics.getTimer(
+ concat(AvaticaJsonHandler.class, MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME));
+
+ this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() {
+ @Override public UnsynchronizedBuffer initialValue() {
+ return new UnsynchronizedBuffer();
+ }
+ };
+ }
+
+ public void handle(String target, Request baseRequest,
+ HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ try (final Context ctx = requestTimer.start()) {
+ response.setContentType("application/json;charset=utf-8");
+ response.setStatus(HttpServletResponse.SC_OK);
+ if (request.getMethod().equals("POST")) {
+ // First look for a request in the header, then look in the body.
+ // The latter allows very large requests without hitting HTTP 413.
+ String rawRequest = request.getHeader("request");
+ if (rawRequest == null) {
+ // Avoid a new buffer creation for every HTTP request
+ final UnsynchronizedBuffer buffer = threadLocalBuffer.get();
+ try (ServletInputStream inputStream = request.getInputStream()) {
+ rawRequest = AvaticaUtils.readFully(inputStream, buffer);
+ } finally {
+ // Reset the offset into the buffer after we're done
+ buffer.reset();
+ }
+ }
+ final String jsonRequest =
+ new String(rawRequest.getBytes("ISO-8859-1"), "UTF-8");
+ LOG.trace("request: {}", jsonRequest);
+
+ final HandlerResponse<String> jsonResponse = jsonHandler.apply(jsonRequest);
+ LOG.trace("response: {}", jsonResponse);
+ baseRequest.setHandled(true);
+ // Set the status code and write out the response.
+ response.setStatus(jsonResponse.getStatusCode());
+ response.getWriter().println(jsonResponse.getResponse());
+ }
+ }
+ }
+
+ @Override public void setServerRpcMetadata(RpcMetadataResponse metadata) {
+ // Set the metadata for the normal service calls
+ service.setRpcMetadata(metadata);
+ // Also add it to the handler to include with exceptions
+ jsonHandler.setRpcMetadata(metadata);
+ }
+
+ @Override public MetricsSystem getMetrics() {
+ return metrics;
+ }
+}
+
+// End AvaticaJsonHandler.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
new file mode 100644
index 0000000..27e73de
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.AvaticaUtils;
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+import org.apache.calcite.avatica.metrics.Timer;
+import org.apache.calcite.avatica.metrics.Timer.Context;
+import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem;
+import org.apache.calcite.avatica.remote.Handler.HandlerResponse;
+import org.apache.calcite.avatica.remote.MetricsHelper;
+import org.apache.calcite.avatica.remote.ProtobufHandler;
+import org.apache.calcite.avatica.remote.ProtobufTranslation;
+import org.apache.calcite.avatica.remote.ProtobufTranslationImpl;
+import org.apache.calcite.avatica.remote.Service;
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+import org.apache.calcite.avatica.util.UnsynchronizedBuffer;
+
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * Jetty handler that executes Avatica JSON request-responses.
+ */
+public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAwareAvaticaHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(AvaticaJsonHandler.class);
+
+ private final Service service;
+ private final ProtobufHandler pbHandler;
+ private final ProtobufTranslation protobufTranslation;
+ private final MetricsSystem metrics;
+ private final Timer requestTimer;
+
+ final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer;
+
+ public AvaticaProtobufHandler(Service service) {
+ this(service, NoopMetricsSystem.getInstance());
+ }
+
+ public AvaticaProtobufHandler(Service service, MetricsSystem metrics) {
+ this.service = Objects.requireNonNull(service);
+ this.metrics = Objects.requireNonNull(metrics);
+
+ this.requestTimer = this.metrics.getTimer(
+ MetricsHelper.concat(AvaticaProtobufHandler.class,
+ MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME));
+
+ this.protobufTranslation = new ProtobufTranslationImpl();
+ this.pbHandler = new ProtobufHandler(service, protobufTranslation, metrics);
+
+ this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() {
+ @Override public UnsynchronizedBuffer initialValue() {
+ return new UnsynchronizedBuffer();
+ }
+ };
+ }
+
+ public void handle(String target, Request baseRequest,
+ HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+ try (final Context ctx = this.requestTimer.start()) {
+ response.setContentType("application/octet-stream;charset=utf-8");
+ response.setStatus(HttpServletResponse.SC_OK);
+ if (request.getMethod().equals("POST")) {
+ byte[] requestBytes;
+ // Avoid a new buffer creation for every HTTP request
+ final UnsynchronizedBuffer buffer = threadLocalBuffer.get();
+ try (ServletInputStream inputStream = request.getInputStream()) {
+ requestBytes = AvaticaUtils.readFullyToBytes(inputStream, buffer);
+ } finally {
+ buffer.reset();
+ }
+
+ HandlerResponse<byte[]> handlerResponse = pbHandler.apply(requestBytes);
+
+ baseRequest.setHandled(true);
+ response.setStatus(handlerResponse.getStatusCode());
+ response.getOutputStream().write(handlerResponse.getResponse());
+ }
+ }
+ }
+
+ @Override public void setServerRpcMetadata(RpcMetadataResponse metadata) {
+ // Set the metadata for the normal service calls
+ service.setRpcMetadata(metadata);
+ // Also add it to the handler to include with exceptions
+ pbHandler.setRpcMetadata(metadata);
+ }
+
+ @Override public MetricsSystem getMetrics() {
+ return this.metrics;
+ }
+
+}
+
+// End AvaticaProtobufHandler.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java
new file mode 100644
index 0000000..a574985
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * An AvaticaHandler implementation that delegates to a provided Jetty Handler instance.
+ *
+ * <p>This implementation provides a no-op implementation for
+ * {@link #setServerRpcMetadata(org.apache.calcite.avatica.remote.Service.RpcMetadataResponse)}.
+ *
+ * Does not implement {@link MetricsAwareAvaticaHandler} as this implementation is only presented
+ * for backwards compatibility.
+ */
+public class DelegatingAvaticaHandler implements AvaticaHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(DelegatingAvaticaHandler.class);
+
+ private final Handler handler;
+
+ public DelegatingAvaticaHandler(Handler handler) {
+ this.handler = Objects.requireNonNull(handler);
+ }
+
+ @Override public void handle(String target, Request baseRequest, HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ handler.handle(target, baseRequest, request, response);
+ }
+
+ @Override public void setServer(Server server) {
+ handler.setServer(server);
+ }
+
+ @Override public Server getServer() {
+ return handler.getServer();
+ }
+
+ @Override public void destroy() {
+ handler.destroy();
+ }
+
+ @Override public void start() throws Exception {
+ handler.start();
+ }
+
+ @Override public void stop() throws Exception {
+ handler.stop();
+ }
+
+ @Override public boolean isRunning() {
+ return handler.isRunning();
+ }
+
+ @Override public boolean isStarted() {
+ return handler.isStarted();
+ }
+
+ @Override public boolean isStarting() {
+ return handler.isStarting();
+ }
+
+ @Override public boolean isStopping() {
+ return handler.isStopping();
+ }
+
+ @Override public boolean isStopped() {
+ return handler.isStopped();
+ }
+
+ @Override public boolean isFailed() {
+ return handler.isFailed();
+ }
+
+ @Override public void addLifeCycleListener(Listener listener) {
+ handler.addLifeCycleListener(listener);
+ }
+
+ @Override public void removeLifeCycleListener(Listener listener) {
+ handler.removeLifeCycleListener(listener);
+ }
+
+ @Override public void setServerRpcMetadata(RpcMetadataResponse metadata) {
+ LOG.warn("Setting RpcMetadata is not implemented for DelegatingAvaticaHandler");
+ }
+
+}
+
+// End DelegatingAvaticaHandler.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java
new file mode 100644
index 0000000..b1fcb40
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+import org.apache.calcite.avatica.metrics.MetricsSystemConfiguration;
+import org.apache.calcite.avatica.metrics.MetricsSystemFactory;
+import org.apache.calcite.avatica.metrics.MetricsSystemLoader;
+import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem;
+import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystemConfiguration;
+import org.apache.calcite.avatica.remote.Driver;
+import org.apache.calcite.avatica.remote.Service;
+
+import org.eclipse.jetty.server.Handler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.ServiceLoader;
+
+/**
+ * Factory that instantiates the desired implementation, typically differing on the method
+ * used to serialize messages, for use in the Avatica server.
+ */
+public class HandlerFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(HandlerFactory.class);
+
+ /**
+ * Constructs the desired implementation for the given serialization method with metrics.
+ *
+ * @param service The underlying {@link Service}.
+ * @param serialization The desired message serialization.
+ * @return The {@link Handler}.
+ */
+ public Handler getHandler(Service service, Driver.Serialization serialization) {
+ return getHandler(service, serialization, NoopMetricsSystemConfiguration.getInstance());
+ }
+
+ /**
+ * Constructs the desired implementation for the given serialization method with metrics.
+ *
+ * @param service The underlying {@link Service}.
+ * @param serialization The desired message serialization.
+ * @param metricsConfig Configuration for the {@link MetricsSystem}.
+ * @return The {@link Handler}.
+ */
+ public Handler getHandler(Service service, Driver.Serialization serialization,
+ MetricsSystemConfiguration<?> metricsConfig) {
+ MetricsSystem metrics = MetricsSystemLoader.load(Objects.requireNonNull(metricsConfig));
+
+ switch (serialization) {
+ case JSON:
+ return new AvaticaJsonHandler(service, metrics);
+ case PROTOBUF:
+ return new AvaticaProtobufHandler(service, metrics);
+ default:
+ throw new IllegalArgumentException("Unknown Avatica handler for " + serialization.name());
+ }
+ }
+
+ /**
+ * Load a {@link MetricsSystem} using ServiceLoader to create a {@link MetricsSystemFactory}.
+ *
+ * @param config State to pass to the factory for initialization.
+ * @return A {@link MetricsSystem} instance.
+ */
+ MetricsSystem loadMetricsSystem(MetricsSystemConfiguration<?> config) {
+ ServiceLoader<MetricsSystemFactory> loader = ServiceLoader.load(MetricsSystemFactory.class);
+ List<MetricsSystemFactory> availableFactories = new ArrayList<>();
+ for (MetricsSystemFactory factory : loader) {
+ availableFactories.add(factory);
+ }
+
+ if (1 == availableFactories.size()) {
+ // One and only one instance -- what we want
+ MetricsSystemFactory factory = availableFactories.get(0);
+ LOG.info("Loaded MetricsSystem {}", factory.getClass());
+ return factory.create(config);
+ } else if (availableFactories.isEmpty()) {
+ // None-provided default to no metrics
+ LOG.info("No metrics implementation available on classpath. Using No-op implementation");
+ return NoopMetricsSystem.getInstance();
+ } else {
+ // Tell the user they're doing something wrong, and choose the first impl.
+ StringBuilder sb = new StringBuilder();
+ for (MetricsSystemFactory factory : availableFactories) {
+ if (sb.length() > 0) {
+ sb.append(", ");
+ }
+ sb.append(factory.getClass());
+ }
+ LOG.warn("Found multiple MetricsSystemFactory implementations: {}."
+ + " Using No-op implementation", sb);
+ return NoopMetricsSystem.getInstance();
+ }
+ }
+}
+
+// End HandlerFactory.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
new file mode 100644
index 0000000..c81e899
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.DefaultHandler;
+import org.eclipse.jetty.server.handler.HandlerList;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * Avatica HTTP server.
+ *
+ * <p>If you need to change the server's configuration, override the
+ * {@link #configureConnector(ServerConnector, int)} method in a derived class.
+ */
+public class HttpServer {
+ private static final Logger LOG = LoggerFactory.getLogger(HttpServer.class);
+
+ private Server server;
+ private int port = -1;
+ private final AvaticaHandler handler;
+
+ @Deprecated
+ public HttpServer(Handler handler) {
+ this(wrapJettyHandler(handler));
+ }
+
+ public HttpServer(AvaticaHandler handler) {
+ this(0, handler);
+ }
+
+ @Deprecated
+ public HttpServer(int port, Handler handler) {
+ this(port, wrapJettyHandler(handler));
+ }
+
+ public HttpServer(int port, AvaticaHandler handler) {
+ this.port = port;
+ this.handler = handler;
+ }
+
+ private static AvaticaHandler wrapJettyHandler(Handler handler) {
+ if (handler instanceof AvaticaHandler) {
+ return (AvaticaHandler) handler;
+ }
+ // Backwards compatibility, noop's the AvaticaHandler interface
+ return new DelegatingAvaticaHandler(handler);
+ }
+
+ public void start() {
+ if (server != null) {
+ throw new RuntimeException("Server is already started");
+ }
+
+ final QueuedThreadPool threadPool = new QueuedThreadPool();
+ threadPool.setDaemon(true);
+ server = new Server(threadPool);
+ server.manage(threadPool);
+
+ final ServerConnector connector = configureConnector(new ServerConnector(server), port);
+
+ server.setConnectors(new Connector[] { connector });
+
+ final HandlerList handlerList = new HandlerList();
+ handlerList.setHandlers(new Handler[] { handler, new DefaultHandler() });
+ server.setHandler(handlerList);
+ try {
+ server.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ port = connector.getLocalPort();
+
+ LOG.info("Service listening on port {}.", getPort());
+
+ // Set the information about the address for this server
+ try {
+ this.handler.setServerRpcMetadata(createRpcServerMetadata(connector));
+ } catch (UnknownHostException e) {
+ // Failed to do the DNS lookup, bail out.
+ throw new RuntimeException(e);
+ }
+ }
+
+ private RpcMetadataResponse createRpcServerMetadata(ServerConnector connector) throws
+ UnknownHostException {
+ String host = connector.getHost();
+ if (null == host) {
+ // "null" means binding to all interfaces, we need to pick one so the client gets a real
+ // address and not "0.0.0.0" or similar.
+ host = InetAddress.getLocalHost().getHostName();
+ }
+
+ final int port = connector.getLocalPort();
+
+ return new RpcMetadataResponse(String.format("%s:%d", host, port));
+ }
+
+ /**
+ * Configures the server connector.
+ *
+ * <p>The default configuration sets a timeout of 1 minute and disables
+ * TCP linger time.
+ *
+ * <p>To change the configuration, override this method in a derived class.
+ * The overriding method must call its super method.
+ *
+ * @param connector connector to be configured
+ * @param port port number handed over in constructor
+ */
+ protected ServerConnector configureConnector(ServerConnector connector, int port) {
+ connector.setIdleTimeout(60 * 1000);
+ connector.setSoLingerTime(-1);
+ connector.setPort(port);
+ return connector;
+ }
+
+ public void stop() {
+ if (server == null) {
+ throw new RuntimeException("Server is already stopped");
+ }
+
+ LOG.info("Service terminating.");
+ try {
+ final Server server1 = server;
+ port = -1;
+ server = null;
+ server1.stop();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void join() throws InterruptedException {
+ server.join();
+ }
+
+ public int getPort() {
+ return port;
+ }
+}
+
+// End HttpServer.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/Main.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/Main.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/Main.java
new file mode 100644
index 0000000..8b05931
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/Main.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.remote.LocalService;
+import org.apache.calcite.avatica.remote.Service;
+
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import java.util.Arrays;
+
+/**
+ * Jetty handler that executes Avatica JSON request-responses.
+ */
+public class Main {
+ private Main() {}
+
+ public static void main(String[] args)
+ throws InterruptedException, ClassNotFoundException,
+ IllegalAccessException, InstantiationException {
+ HttpServer server = start(args);
+ server.join();
+ }
+
+ /**
+ * Factory that instantiates Jetty Handlers
+ */
+ public interface HandlerFactory {
+ AbstractHandler createHandler(Service service);
+ }
+
+ private static final HandlerFactory JSON_HANDLER_FACTORY = new HandlerFactory() {
+ public AbstractHandler createHandler(Service service) {
+ return new AvaticaJsonHandler(service);
+ }
+ };
+
+ /**
+ * Creates and starts an {@link HttpServer} using JSON POJO serialization of requests/responses.
+ *
+ * <p>Arguments are as follows:
+ * <ul>
+ * <li>args[0]: the {@link org.apache.calcite.avatica.Meta.Factory} class
+ * name
+ * <li>args[1+]: arguments passed along to
+ * {@link org.apache.calcite.avatica.Meta.Factory#create(java.util.List)}
+ * </ul>
+ *
+ * @param args Command-line arguments
+ */
+ public static HttpServer start(String[] args) throws ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ return start(args, 8765, JSON_HANDLER_FACTORY);
+ }
+
+ /**
+ * Creates and starts an {@link HttpServer} using the given factory to create the Handler.
+ *
+ * <p>Arguments are as follows:
+ * <ul>
+ * <li>args[0]: the {@link org.apache.calcite.avatica.Meta.Factory} class
+ * name
+ * <li>args[1+]: arguments passed along to
+ * {@link org.apache.calcite.avatica.Meta.Factory#create(java.util.List)}
+ * </ul>
+ *
+ * @param args Command-line arguments
+ * @param port Server port to bind
+ * @param handlerFactory Factory to create the handler used by the server
+ */
+ public static HttpServer start(String[] args, int port, HandlerFactory handlerFactory)
+ throws ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
+ String factoryClassName = args[0];
+ Class<?> factoryClass = Class.forName(factoryClassName);
+ Meta.Factory factory = (Meta.Factory) factoryClass.newInstance();
+ Meta meta = factory.create(Arrays.asList(args).subList(1, args.length));
+ Service service = new LocalService(meta);
+ HttpServer server = new HttpServer(port, handlerFactory.createHandler(service));
+ server.start();
+ return server;
+ }
+}
+
+// End Main.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java
new file mode 100644
index 0000000..0914dbd
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.metrics.MetricsSystem;
+
+/**
+ * An {@link AvaticaHandler} that is capable of collecting metrics.
+ */
+public interface MetricsAwareAvaticaHandler extends AvaticaHandler {
+
+ /**
+ * General prefix for all metrics in a handler.
+ */
+ String HANDLER_PREFIX = "Handler.";
+
+ /**
+ * Name for timing requests from users
+ */
+ String REQUEST_TIMER_NAME = HANDLER_PREFIX + "RequestTimings";
+
+ /**
+ * @return An instance of the {@link MetricsSystem} for this AvaticaHandler.
+ */
+ MetricsSystem getMetrics();
+
+}
+
+// End MetricsAwareAvaticaHandler.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/package-info.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/package-info.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/package-info.java
new file mode 100644
index 0000000..f2b8728
--- /dev/null
+++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Avatica server that listens for HTTP requests.
+ */
+@PackageMarker
+package org.apache.calcite.avatica.server;
+
+import org.apache.calcite.avatica.util.PackageMarker;
+
+// End package-info.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/test/java/org/apache/calcite/avatica/ConnectionSpec.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/test/java/org/apache/calcite/avatica/ConnectionSpec.java b/avatica/server/src/test/java/org/apache/calcite/avatica/ConnectionSpec.java
new file mode 100644
index 0000000..ba4c5b8
--- /dev/null
+++ b/avatica/server/src/test/java/org/apache/calcite/avatica/ConnectionSpec.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica;
+
+import net.hydromatic.scott.data.hsqldb.ScottHsqldb;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/** Information necessary to create a JDBC connection. Specify one to run
+ * tests against a different database. (hsqldb is the default.) */
+public class ConnectionSpec {
+ public final String url;
+ public final String username;
+ public final String password;
+ public final String driver;
+
+ // CALCITE-687 HSQLDB seems to fail oddly when multiple tests are run concurrently
+ private static final ReentrantLock HSQLDB_LOCK = new ReentrantLock();
+
+ public ConnectionSpec(String url, String username, String password,
+ String driver) {
+ this.url = url;
+ this.username = username;
+ this.password = password;
+ this.driver = driver;
+ }
+
+ public static final ConnectionSpec HSQLDB =
+ new ConnectionSpec(ScottHsqldb.URI, ScottHsqldb.USER,
+ ScottHsqldb.PASSWORD, "org.hsqldb.jdbcDriver");
+
+ /**
+ * Return a lock used for controlling concurrent access to the database as it has been observed
+ * that concurrent access is causing problems with HSQLDB.
+ */
+ public static ReentrantLock getDatabaseLock() {
+ return HSQLDB_LOCK;
+ }
+}
+
+// End ConnectionSpec.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java
----------------------------------------------------------------------
diff --git a/avatica/server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java b/avatica/server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java
new file mode 100644
index 0000000..9749ef6
--- /dev/null
+++ b/avatica/server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica;
+
+import org.apache.calcite.avatica.remote.MockJsonService;
+import org.apache.calcite.avatica.remote.MockProtobufService.MockProtobufServiceFactory;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * RemoteDriver tests that use a Mock implementation of a Connection.
+ */
+@RunWith(Parameterized.class)
+public class RemoteDriverMockTest {
+ public static final String MJS = MockJsonService.Factory.class.getName();
+ public static final String MPBS = MockProtobufServiceFactory.class.getName();
+
+ private static Connection mjs() throws SQLException {
+ return DriverManager.getConnection("jdbc:avatica:remote:factory=" + MJS);
+ }
+
+ private static Connection mpbs() throws SQLException {
+ return DriverManager.getConnection("jdbc:avatica:remote:factory=" + MPBS);
+ }
+
+ @Parameters
+ public static List<Object[]> parameters() {
+ List<Object[]> parameters = new ArrayList<>();
+
+ parameters.add(new Object[] {new Callable<Connection>() {
+ public Connection call() throws SQLException {
+ return mjs();
+ }
+ } });
+
+ parameters.add(new Object[] {new Callable<Connection>() {
+ public Connection call() throws SQLException {
+ return mpbs();
+ }
+ } });
+
+ return parameters;
+ }
+
+ private final Callable<Connection> connectionFunctor;
+
+ public RemoteDriverMockTest(Callable<Connection> functor) {
+ this.connectionFunctor = functor;
+ }
+
+ private Connection getMockConnection() {
+ try {
+ return connectionFunctor.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test public void testRegister() throws Exception {
+ final Connection connection = getMockConnection();
+ assertThat(connection.isClosed(), is(false));
+ connection.close();
+ assertThat(connection.isClosed(), is(true));
+ }
+
+ @Test public void testSchemas() throws Exception {
+ final Connection connection = getMockConnection();
+ final ResultSet resultSet =
+ connection.getMetaData().getSchemas(null, null);
+ assertFalse(resultSet.next());
+ final ResultSetMetaData metaData = resultSet.getMetaData();
+ assertTrue(metaData.getColumnCount() >= 2);
+ assertEquals("TABLE_CATALOG", metaData.getColumnName(1));
+ assertEquals("TABLE_SCHEM", metaData.getColumnName(2));
+ resultSet.close();
+ connection.close();
+ }
+
+ @Test public void testTables() throws Exception {
+ final Connection connection = getMockConnection();
+ final ResultSet resultSet =
+ connection.getMetaData().getTables(null, null, null, new String[0]);
+ assertFalse(resultSet.next());
+ final ResultSetMetaData metaData = resultSet.getMetaData();
+ assertTrue(metaData.getColumnCount() >= 3);
+ assertEquals("TABLE_CAT", metaData.getColumnName(1));
+ assertEquals("TABLE_SCHEM", metaData.getColumnName(2));
+ assertEquals("TABLE_NAME", metaData.getColumnName(3));
+ resultSet.close();
+ connection.close();
+ }
+
+ @Ignore
+ @Test public void testNoFactory() throws Exception {
+ final Connection connection =
+ DriverManager.getConnection("jdbc:avatica:remote:");
+ assertThat(connection.isClosed(), is(false));
+ final ResultSet resultSet = connection.getMetaData().getSchemas();
+ assertFalse(resultSet.next());
+ final ResultSetMetaData metaData = resultSet.getMetaData();
+ assertEquals(2, metaData.getColumnCount());
+ assertEquals("TABLE_SCHEM", metaData.getColumnName(1));
+ assertEquals("TABLE_CATALOG", metaData.getColumnName(2));
+ resultSet.close();
+ connection.close();
+ assertThat(connection.isClosed(), is(true));
+ }
+
+ @Ignore
+ @Test public void testCatalogsMock() throws Exception {
+ final Connection connection = getMockConnection();
+ assertThat(connection.isClosed(), is(false));
+ final ResultSet resultSet = connection.getMetaData().getSchemas();
+ assertFalse(resultSet.next());
+ final ResultSetMetaData metaData = resultSet.getMetaData();
+ assertEquals(2, metaData.getColumnCount());
+ assertEquals("TABLE_SCHEM", metaData.getColumnName(1));
+ assertEquals("TABLE_CATALOG", metaData.getColumnName(2));
+ resultSet.close();
+ connection.close();
+ assertThat(connection.isClosed(), is(true));
+ }
+
+ @Ignore
+ @Test public void testStatementExecuteQueryMock() throws Exception {
+ checkStatementExecuteQuery(getMockConnection(), false);
+ }
+
+ @Ignore
+ @Test public void testPrepareExecuteQueryMock() throws Exception {
+ checkStatementExecuteQuery(getMockConnection(), true);
+ }
+
+ private void checkStatementExecuteQuery(Connection connection,
+ boolean prepare) throws SQLException {
+ final String sql = "select * from (\n"
+ + " values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)";
+ final Statement statement;
+ final ResultSet resultSet;
+ final ParameterMetaData parameterMetaData;
+ if (prepare) {
+ final PreparedStatement ps = connection.prepareStatement(sql);
+ statement = ps;
+ parameterMetaData = ps.getParameterMetaData();
+ resultSet = ps.executeQuery();
+ } else {
+ statement = connection.createStatement();
+ parameterMetaData = null;
+ resultSet = statement.executeQuery(sql);
+ }
+ if (parameterMetaData != null) {
+ assertThat(parameterMetaData.getParameterCount(), equalTo(0));
+ }
+ final ResultSetMetaData metaData = resultSet.getMetaData();
+ assertEquals(2, metaData.getColumnCount());
+ assertEquals("C1", metaData.getColumnName(1));
+ assertEquals("C2", metaData.getColumnName(2));
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertTrue(resultSet.next());
+ assertFalse(resultSet.next());
+ resultSet.close();
+ statement.close();
+ connection.close();
+ }
+
+ @Test public void testResultSetsFinagled() throws Exception {
+ // These values specified in MockJsonService
+ final String table = "my_table";
+ final long value = 10;
+
+ final Connection connection = getMockConnection();
+ // Not an accurate ResultSet per JDBC, but close enough for testing.
+ ResultSet results = connection.getMetaData().getColumns(null, null, table, null);
+ assertTrue(results.next());
+ assertEquals(table, results.getString(1));
+ assertEquals(value, results.getLong(2));
+ }
+
+}
+
+// End RemoteDriverMockTest.java