You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2015/09/17 02:10:49 UTC
[3/8] incubator-calcite git commit: [CALCITE-840] Protocol buffer
serialization over HTTP for Avatica Server (Josh Elser)
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java
new file mode 100644
index 0000000..e4a1b3e
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/AbstractService.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.Meta;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A common base class for {@link Service} implementations that implement
+ * modifications made to response objects.
+ */
+public abstract class AbstractService implements Service {
+
+ /** Modifies a signature, changing the representation of numeric columns
+ * within it. This deals with the fact that JSON transmits a small long value,
+ * or a float which is a whole number, as an integer. Thus the accessors need
+ * be prepared to accept any numeric type. */
+ Meta.Signature finagle(Meta.Signature signature) {
+ final List<ColumnMetaData> columns = new ArrayList<>();
+ for (ColumnMetaData column : signature.columns) {
+ columns.add(finagle(column));
+ }
+ if (columns.equals(signature.columns)) {
+ return signature;
+ }
+ return new Meta.Signature(columns, signature.sql,
+ signature.parameters, signature.internalParameters,
+ signature.cursorFactory);
+ }
+
+ ColumnMetaData finagle(ColumnMetaData column) {
+ switch (column.type.rep) {
+ case BYTE:
+ case PRIMITIVE_BYTE:
+ case DOUBLE:
+ case PRIMITIVE_DOUBLE:
+ case FLOAT:
+ case PRIMITIVE_FLOAT:
+ case INTEGER:
+ case PRIMITIVE_INT:
+ case SHORT:
+ case PRIMITIVE_SHORT:
+ case LONG:
+ case PRIMITIVE_LONG:
+ return column.setRep(ColumnMetaData.Rep.NUMBER);
+ default:
+ // continue
+ break;
+ }
+ switch (column.type.id) {
+ case Types.VARBINARY:
+ case Types.BINARY:
+ return column.setRep(ColumnMetaData.Rep.STRING);
+ case Types.DECIMAL:
+ case Types.NUMERIC:
+ return column.setRep(ColumnMetaData.Rep.NUMBER);
+ default:
+ return column;
+ }
+ }
+
+ PrepareResponse finagle(PrepareResponse response) {
+ final Meta.StatementHandle statement = finagle(response.statement);
+ if (statement == response.statement) {
+ return response;
+ }
+ return new PrepareResponse(statement);
+ }
+
+ Meta.StatementHandle finagle(Meta.StatementHandle h) {
+ final Meta.Signature signature = finagle(h.signature);
+ if (signature == h.signature) {
+ return h;
+ }
+ return new Meta.StatementHandle(h.connectionId, h.id, signature);
+ }
+
+ ResultSetResponse finagle(ResultSetResponse r) {
+ if (r.updateCount != -1) {
+ assert r.signature == null;
+ return r;
+ }
+ final Meta.Signature signature = finagle(r.signature);
+ if (signature == r.signature) {
+ return r;
+ }
+ return new ResultSetResponse(r.connectionId, r.statementId, r.ownStatement,
+ signature, r.firstFrame, r.updateCount);
+ }
+
+ ExecuteResponse finagle(ExecuteResponse r) {
+ final List<ResultSetResponse> results = new ArrayList<>();
+ int changeCount = 0;
+ for (ResultSetResponse result : r.results) {
+ ResultSetResponse result2 = finagle(result);
+ if (result2 != result) {
+ ++changeCount;
+ }
+ results.add(result2);
+ }
+ if (changeCount == 0) {
+ return r;
+ }
+ return new ExecuteResponse(results);
+ }
+}
+
+// End AbstractService.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
index 16fcbc0..5e755f8 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/AvaticaRemoteConnectionProperty.java
@@ -36,8 +36,7 @@ public enum AvaticaRemoteConnectionProperty implements ConnectionProperty {
private final Type type;
private final Object defaultValue;
- private static final Map<String, AvaticaRemoteConnectionProperty>
- NAME_TO_PROPS;
+ private static final Map<String, AvaticaRemoteConnectionProperty> NAME_TO_PROPS;
static {
NAME_TO_PROPS = new HashMap<String, AvaticaRemoteConnectionProperty>();
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java
index 9e9c623..7d8c058 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Driver.java
@@ -45,6 +45,14 @@ public class Driver extends UnregisteredDriver {
super();
}
+ /**
+ * Defines the method of message serialization used by the Driver
+ */
+ public static enum Serialization {
+ JSON,
+ PROTOBUF;
+ }
+
@Override protected String getConnectStringPrefix() {
return CONNECT_STRING_PREFIX;
}
@@ -79,12 +87,39 @@ public class Driver extends UnregisteredDriver {
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
- service = new RemoteService(url);
+
+ Serialization serializationType = getSerialization(config);
+
+ switch (serializationType) {
+ case JSON:
+ service = new RemoteService(url);
+ break;
+ case PROTOBUF:
+ service = new RemoteProtobufService(url, new ProtobufTranslationImpl());
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled serialization type: " + serializationType);
+ }
} else {
service = new MockJsonService(Collections.<String, String>emptyMap());
}
return new RemoteMeta(connection, service);
}
+
+ private Serialization getSerialization(ConnectionConfig config) {
+ final String serializationStr = config.serialization();
+ Serialization serializationType = Serialization.JSON;
+ if (null != serializationStr) {
+ try {
+ serializationType = Serialization.valueOf(serializationStr.toUpperCase());
+ } catch (Exception e) {
+ // Log a warning instead of failing harshly? Intentionally no loggers available?
+ throw new RuntimeException(e);
+ }
+ }
+
+ return serializationType;
+ }
}
// End Driver.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/Handler.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Handler.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Handler.java
index 94eb01e..a3f564d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Handler.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Handler.java
@@ -18,9 +18,11 @@ package org.apache.calcite.avatica.remote;
/**
* API for text request-response calls to an Avatica server.
+ *
+ * @param <T> The type this handler accepts and returns
*/
-public interface Handler {
- String apply(String request);
+public interface Handler<T> {
+ T apply(T request);
}
// End Handler.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
index dee0636..f59218f 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonHandler.java
@@ -28,7 +28,7 @@ import java.io.StringWriter;
*
* @see org.apache.calcite.avatica.remote.JsonService
*/
-public class JsonHandler implements Handler {
+public class JsonHandler implements Handler<String> {
private final Service service;
protected static final ObjectMapper MAPPER = JsonService.MAPPER;
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
index 5c81b7b..dcb3d28 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
@@ -16,25 +16,19 @@
*/
package org.apache.calcite.avatica.remote;
-import org.apache.calcite.avatica.ColumnMetaData;
-import org.apache.calcite.avatica.Meta;
-
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.StringWriter;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.List;
/**
* Implementation of {@link org.apache.calcite.avatica.remote.Service}
* that encodes requests and responses as JSON.
*/
-public abstract class JsonService implements Service {
- protected static final ObjectMapper MAPPER;
+public abstract class JsonService extends AbstractService {
+ public static final ObjectMapper MAPPER;
static {
MAPPER = new ObjectMapper();
MAPPER.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
@@ -49,96 +43,6 @@ public abstract class JsonService implements Service {
* responses to and from the peer service. */
public abstract String apply(String request);
- /** Modifies a signature, changing the representation of numeric columns
- * within it. This deals with the fact that JSON transmits a small long value,
- * or a float which is a whole number, as an integer. Thus the accessors need
- * be prepared to accept any numeric type. */
- private static Meta.Signature finagle(Meta.Signature signature) {
- final List<ColumnMetaData> columns = new ArrayList<>();
- for (ColumnMetaData column : signature.columns) {
- columns.add(finagle(column));
- }
- if (columns.equals(signature.columns)) {
- return signature;
- }
- return new Meta.Signature(columns, signature.sql,
- signature.parameters, signature.internalParameters,
- signature.cursorFactory);
- }
-
- private static ColumnMetaData finagle(ColumnMetaData column) {
- switch (column.type.rep) {
- case BYTE:
- case PRIMITIVE_BYTE:
- case DOUBLE:
- case PRIMITIVE_DOUBLE:
- case FLOAT:
- case PRIMITIVE_FLOAT:
- case INTEGER:
- case PRIMITIVE_INT:
- case SHORT:
- case PRIMITIVE_SHORT:
- case LONG:
- case PRIMITIVE_LONG:
- return column.setRep(ColumnMetaData.Rep.NUMBER);
- }
- switch (column.type.id) {
- case Types.VARBINARY:
- case Types.BINARY:
- return column.setRep(ColumnMetaData.Rep.STRING);
- case Types.DECIMAL:
- case Types.NUMERIC:
- return column.setRep(ColumnMetaData.Rep.NUMBER);
- default:
- return column;
- }
- }
-
- private static PrepareResponse finagle(PrepareResponse response) {
- final Meta.StatementHandle statement = finagle(response.statement);
- if (statement == response.statement) {
- return response;
- }
- return new PrepareResponse(statement);
- }
-
- private static Meta.StatementHandle finagle(Meta.StatementHandle h) {
- final Meta.Signature signature = finagle(h.signature);
- if (signature == h.signature) {
- return h;
- }
- return new Meta.StatementHandle(h.connectionId, h.id, signature);
- }
-
- private static ResultSetResponse finagle(ResultSetResponse r) {
- if (r.updateCount != -1) {
- assert r.signature == null;
- return r;
- }
- final Meta.Signature signature = finagle(r.signature);
- if (signature == r.signature) {
- return r;
- }
- return new ResultSetResponse(r.connectionId, r.statementId, r.ownStatement,
- signature, r.firstFrame, r.updateCount);
- }
-
- private static ExecuteResponse finagle(ExecuteResponse r) {
- final List<ResultSetResponse> results = new ArrayList<>();
- int changeCount = 0;
- for (ResultSetResponse result : r.results) {
- ResultSetResponse result2 = finagle(result);
- if (result2 != result) {
- ++changeCount;
- }
- results.add(result2);
- }
- if (changeCount == 0) {
- return r;
- }
- return new ExecuteResponse(results);
- }
-
//@VisibleForTesting
protected static <T> T decode(String response, Class<T> valueType)
throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java
new file mode 100644
index 0000000..76e2392
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import java.io.IOException;
+
+/**
+ * A Service implementation that performs protocol buffer serialization on request and responses
+ * on either side of computing a response from a request to mimic some transport to a server which
+ * would normally perform such computation.
+ */
+public class LocalProtobufService extends ProtobufService {
+ private final Service service;
+ private final ProtobufTranslation translation;
+
+ public LocalProtobufService(Service service, ProtobufTranslation translation) {
+ this.service = service;
+ this.translation = translation;
+ }
+
+ @Override public Response _apply(Request request) {
+ try {
+ // Serialize the request to "send to the server"
+ byte[] serializedRequest = translation.serializeRequest(request);
+
+ // *some transport would normally happen here*
+
+ // Fake deserializing that request somewhere else
+ Request request2 = translation.parseRequest(serializedRequest);
+
+ // Serialize the response from the service to "send to the client"
+ byte[] serializedResponse = translation.serializeResponse(request2.accept(service));
+
+ // *some transport would normally happen here*
+
+ // Deserialize the response on "the client"
+ return translation.parseResponse(serializedResponse);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
+
+// End LocalProtobufService.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
new file mode 100644
index 0000000..5a455fc
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaParameter;
+import org.apache.calcite.avatica.ColumnMetaData;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.MetaImpl;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A mock implementation of ProtobufService for testing.
+ *
+ * <p>It performs no serialization of requests and responses.
+ */
+public class MockProtobufService extends ProtobufService {
+
+ private static final Map<Request, Response> MAPPING;
+ static {
+ HashMap<Request, Response> mappings = new HashMap<>();
+
+ // Add in mappings
+
+ // Get the schema, no.. schema..?
+ mappings.put(
+ new SchemasRequest(null, null),
+ new ResultSetResponse(null, 1, true, null, Meta.Frame.EMPTY, -1));
+
+ // Get the tables, no tables exist
+ mappings.put(new TablesRequest(null, null, null, Collections.<String>emptyList()),
+ new ResultSetResponse(null, 150, true, null, Meta.Frame.EMPTY, -1));
+
+ // Create a statement, get back an id
+ mappings.put(new CreateStatementRequest("0"), new CreateStatementResponse("0", 1));
+
+ // Prepare and execute a query. Values and schema are returned
+ mappings.put(
+ new PrepareAndExecuteRequest("0", 1,
+ "select * from (\\n values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)", -1),
+ new ResultSetResponse("0", 1, true,
+ Meta.Signature.create(
+ Arrays.<ColumnMetaData>asList(
+ MetaImpl.columnMetaData("C1", 0, Integer.class),
+ MetaImpl.columnMetaData("C2", 1, String.class)),
+ null, null, Meta.CursorFactory.ARRAY),
+ Meta.Frame.create(0, true,
+ Arrays.<Object>asList(new Object[] {1, "a"},
+ new Object[] {null, "b"}, new Object[] {3, "c"})), -1));
+
+ // Prepare a query. Schema for results are returned, but no values
+ mappings.put(
+ new PrepareRequest("0",
+ "select * from (\\n values(1, 'a'), (null, 'b'), (3, 'c')), as t (c1, c2)", -1),
+ new ResultSetResponse("0", 1, true,
+ Meta.Signature.create(
+ Arrays.<ColumnMetaData>asList(
+ MetaImpl.columnMetaData("C1", 0, Integer.class),
+ MetaImpl.columnMetaData("C2", 1, String.class)),
+ null, Collections.<AvaticaParameter>emptyList(),
+ Meta.CursorFactory.ARRAY),
+ null, -1));
+
+ MAPPING = Collections.unmodifiableMap(mappings);
+ }
+
+ @Override public Response _apply(Request request) {
+ if (request instanceof CloseConnectionRequest) {
+ return new CloseConnectionResponse();
+ }
+
+ return dispatch(request);
+ }
+
+ /**
+ * Fetches the static response for the given request.
+ *
+ * @param request the client's request
+ * @return the appropriate response
+ * @throws RuntimeException if no mapping is found for the request
+ */
+ private Response dispatch(Request request) {
+ Response response = MAPPING.get(request);
+
+ if (null == response) {
+ throw new RuntimeException("Had no response mapping for " + request);
+ }
+
+ return response;
+ }
+
+ /**
+ * A factory that instantiates the mock protobuf service.
+ */
+ public static class MockProtobufServiceFactory implements Service.Factory {
+ @Override public Service create(AvaticaConnection connection) {
+ return new MockProtobufService();
+ }
+ }
+}
+
+// End MockProtobufService.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java
new file mode 100644
index 0000000..52f0a03
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.remote.Service.Response;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import java.io.IOException;
+
+/**
+ * Dispatches serialized protocol buffer messages to the provided {@link Service}
+ * by converting them to the POJO Request. Returns back the serialized protocol
+ * buffer response.
+ */
+public class ProtobufHandler implements Handler<byte[]> {
+
+ private final Service service;
+ private final ProtobufTranslation translation;
+
+ public ProtobufHandler(Service service, ProtobufTranslation translation) {
+ this.service = service;
+ this.translation = translation;
+ }
+
+ @Override public byte[] apply(byte[] requestBytes) {
+ // Transform the protocol buffer bytes into a POJO
+ // Encapsulate the task of transforming this since
+ // the bytes also contain the PB request class name.
+ Service.Request requestPojo;
+ try {
+ requestPojo = translation.parseRequest(requestBytes);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+
+ // Get the response for the request
+ Response response = requestPojo.accept(service);
+
+ try {
+ // Serialize it into bytes for the wire.
+ return translation.serializeResponse(response);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
+
+// End ProtobufHandler.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java
new file mode 100644
index 0000000..13d5860
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Message;
+
+/**
+ * Service implementation that encodes requests and responses as protocol buffers.
+ */
+public abstract class ProtobufService extends AbstractService {
+
+ /**
+ * Derived class should implement this method to transport requests and
+ * responses to and from the peer service.
+ */
+ public abstract Response _apply(Request request);
+
+ @Override public ResultSetResponse apply(CatalogsRequest request) {
+ return (ResultSetResponse) _apply(request);
+ }
+
+ @Override public ResultSetResponse apply(SchemasRequest request) {
+ return (ResultSetResponse) _apply(request);
+ }
+
+ @Override public ResultSetResponse apply(TablesRequest request) {
+ return (ResultSetResponse) _apply(request);
+ }
+
+ @Override public ResultSetResponse apply(TableTypesRequest request) {
+ return (ResultSetResponse) _apply(request);
+ }
+
+ @Override public ResultSetResponse apply(TypeInfoRequest request) {
+ return (ResultSetResponse) _apply(request);
+ }
+
+ @Override public ResultSetResponse apply(ColumnsRequest request) {
+ return (ResultSetResponse) _apply(request);
+ }
+
+ @Override public PrepareResponse apply(PrepareRequest request) {
+ return finagle((PrepareResponse) _apply(request));
+ }
+
+ @Override public ExecuteResponse apply(PrepareAndExecuteRequest request) {
+ return finagle((ExecuteResponse) _apply(request));
+ }
+
+ @Override public FetchResponse apply(FetchRequest request) {
+ return (FetchResponse) _apply(request);
+ }
+
+ @Override public CreateStatementResponse apply(CreateStatementRequest request) {
+ return (CreateStatementResponse) _apply(request);
+ }
+
+ @Override public CloseStatementResponse apply(CloseStatementRequest request) {
+ return (CloseStatementResponse) _apply(request);
+ }
+
+ @Override public CloseConnectionResponse apply(CloseConnectionRequest request) {
+ return (CloseConnectionResponse) _apply(request);
+ }
+
+ @Override public ConnectionSyncResponse apply(ConnectionSyncRequest request) {
+ return (ConnectionSyncResponse) _apply(request);
+ }
+
+ @Override public DatabasePropertyResponse apply(DatabasePropertyRequest request) {
+ return (DatabasePropertyResponse) _apply(request);
+ }
+
+ /**
+ * Determines whether the given message has the field, denoted by the provided number, set.
+ *
+ * @param msg The protobuf message
+ * @param desc The descriptor for the message
+ * @param fieldNum The identifier for the field
+ * @return True if the message contains the field, false otherwise
+ */
+ public static boolean hasField(Message msg, Descriptor desc, int fieldNum) {
+ return msg.hasField(desc.findFieldByNumber(fieldNum));
+ }
+}
+
+// End ProtobufService.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
new file mode 100644
index 0000000..acb82db
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.remote.Service.Request;
+import org.apache.calcite.avatica.remote.Service.Response;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import java.io.IOException;
+
+/**
+ * Generic interface to support parsing of serialized protocol buffers between client and server.
+ */
+public interface ProtobufTranslation {
+
+ /**
+ * Serializes a {@link Response} as a protocol buffer.
+ *
+ * @param response The response to serialize
+ * @throws IOException If there are errors during serialization
+ */
+ byte[] serializeResponse(Response response) throws IOException;
+
+ /**
+ * Serializes a {@link Request} as a protocol buffer.
+ *
+ * @param request The request to serialize
+ * @throws IOException If there are errors during serialization
+ */
+ byte[] serializeRequest(Request request) throws IOException;
+
+ /**
+ * Parses a serialized protocol buffer request into a {@link Request}.
+ *
+ * @param bytes Serialized protocol buffer request from client
+ * @return A Request object for the given bytes
+ * @throws InvalidProtocolBufferException If the protocol buffer cannot be deserialized
+ */
+ Request parseRequest(byte[] bytes) throws InvalidProtocolBufferException;
+
+ /**
+ * Parses a serialized protocol buffer response into a {@link Response}.
+ *
+ * @param bytes Serialized protocol buffer request from server
+ * @return The Response object for the given bytes
+ * @throws InvalidProtocolBufferException If the protocol buffer cannot be deserialized
+ */
+ Response parseResponse(byte[] bytes) throws InvalidProtocolBufferException;
+}
+
+// End ProtobufTranslation.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
new file mode 100644
index 0000000..5a4cc11
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.proto.Common.WireMessage;
+import org.apache.calcite.avatica.proto.Requests.CatalogsRequest;
+import org.apache.calcite.avatica.proto.Requests.CloseConnectionRequest;
+import org.apache.calcite.avatica.proto.Requests.CloseStatementRequest;
+import org.apache.calcite.avatica.proto.Requests.ColumnsRequest;
+import org.apache.calcite.avatica.proto.Requests.ConnectionSyncRequest;
+import org.apache.calcite.avatica.proto.Requests.CreateStatementRequest;
+import org.apache.calcite.avatica.proto.Requests.DatabasePropertyRequest;
+import org.apache.calcite.avatica.proto.Requests.FetchRequest;
+import org.apache.calcite.avatica.proto.Requests.PrepareAndExecuteRequest;
+import org.apache.calcite.avatica.proto.Requests.PrepareRequest;
+import org.apache.calcite.avatica.proto.Requests.SchemasRequest;
+import org.apache.calcite.avatica.proto.Requests.TableTypesRequest;
+import org.apache.calcite.avatica.proto.Requests.TablesRequest;
+import org.apache.calcite.avatica.proto.Requests.TypeInfoRequest;
+import org.apache.calcite.avatica.proto.Responses.CloseConnectionResponse;
+import org.apache.calcite.avatica.proto.Responses.CloseStatementResponse;
+import org.apache.calcite.avatica.proto.Responses.ConnectionSyncResponse;
+import org.apache.calcite.avatica.proto.Responses.CreateStatementResponse;
+import org.apache.calcite.avatica.proto.Responses.DatabasePropertyResponse;
+import org.apache.calcite.avatica.proto.Responses.ExecuteResponse;
+import org.apache.calcite.avatica.proto.Responses.FetchResponse;
+import org.apache.calcite.avatica.proto.Responses.PrepareResponse;
+import org.apache.calcite.avatica.proto.Responses.ResultSetResponse;
+import org.apache.calcite.avatica.remote.Service.Request;
+import org.apache.calcite.avatica.remote.Service.Response;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ProtobufTranslationImpl} that translates
+ * protobuf requests to POJO requests.
+ */
+public class ProtobufTranslationImpl implements ProtobufTranslation {
+
+ // Extremely ugly mapping of PB class name into a means to convert it to the POJO
+ private static final Map<String, RequestTranslator> REQUEST_PARSERS;
+ private static final Map<String, ResponseTranslator> RESPONSE_PARSERS;
+
+ static {
+ HashMap<String, RequestTranslator> reqParsers = new HashMap<>();
+ reqParsers.put(CatalogsRequest.class.getName(),
+ new RequestTranslator(CatalogsRequest.PARSER, new Service.CatalogsRequest()));
+ reqParsers.put(CloseConnectionRequest.class.getName(),
+ new RequestTranslator(CloseConnectionRequest.PARSER, new Service.CloseConnectionRequest()));
+ reqParsers.put(CloseStatementRequest.class.getName(),
+ new RequestTranslator(CloseStatementRequest.PARSER, new Service.CloseStatementRequest()));
+ reqParsers.put(ColumnsRequest.class.getName(),
+ new RequestTranslator(ColumnsRequest.PARSER, new Service.ColumnsRequest()));
+ reqParsers.put(ConnectionSyncRequest.class.getName(),
+ new RequestTranslator(ConnectionSyncRequest.PARSER, new Service.ConnectionSyncRequest()));
+ reqParsers.put(CreateStatementRequest.class.getName(),
+ new RequestTranslator(CreateStatementRequest.PARSER, new Service.CreateStatementRequest()));
+ reqParsers.put(DatabasePropertyRequest.class.getName(),
+ new RequestTranslator(DatabasePropertyRequest.PARSER,
+ new Service.DatabasePropertyRequest()));
+ reqParsers.put(FetchRequest.class.getName(),
+ new RequestTranslator(FetchRequest.PARSER, new Service.FetchRequest()));
+ reqParsers.put(PrepareAndExecuteRequest.class.getName(),
+ new RequestTranslator(PrepareAndExecuteRequest.PARSER,
+ new Service.PrepareAndExecuteRequest()));
+ reqParsers.put(PrepareRequest.class.getName(),
+ new RequestTranslator(PrepareRequest.PARSER, new Service.PrepareRequest()));
+ reqParsers.put(SchemasRequest.class.getName(),
+ new RequestTranslator(SchemasRequest.PARSER, new Service.SchemasRequest()));
+ reqParsers.put(TablesRequest.class.getName(),
+ new RequestTranslator(TablesRequest.PARSER, new Service.TablesRequest()));
+ reqParsers.put(TableTypesRequest.class.getName(),
+ new RequestTranslator(TableTypesRequest.PARSER, new Service.TableTypesRequest()));
+ reqParsers.put(TypeInfoRequest.class.getName(),
+ new RequestTranslator(TypeInfoRequest.PARSER, new Service.TypeInfoRequest()));
+
+ REQUEST_PARSERS = Collections.unmodifiableMap(reqParsers);
+
+ HashMap<String, ResponseTranslator> respParsers = new HashMap<>();
+ respParsers.put(CloseConnectionResponse.class.getName(),
+ new ResponseTranslator(CloseConnectionResponse.PARSER,
+ new Service.CloseConnectionResponse()));
+ respParsers.put(CloseStatementResponse.class.getName(),
+ new ResponseTranslator(CloseStatementResponse.PARSER,
+ new Service.CloseStatementResponse()));
+ respParsers.put(ConnectionSyncResponse.class.getName(),
+ new ResponseTranslator(ConnectionSyncResponse.PARSER,
+ new Service.ConnectionSyncResponse()));
+ respParsers.put(CreateStatementResponse.class.getName(),
+ new ResponseTranslator(CreateStatementResponse.PARSER,
+ new Service.CreateStatementResponse()));
+ respParsers.put(DatabasePropertyResponse.class.getName(),
+ new ResponseTranslator(DatabasePropertyResponse.PARSER,
+ new Service.DatabasePropertyResponse()));
+ respParsers.put(ExecuteResponse.class.getName(),
+ new ResponseTranslator(ExecuteResponse.PARSER, new Service.ExecuteResponse()));
+ respParsers.put(FetchResponse.class.getName(),
+ new ResponseTranslator(FetchResponse.PARSER, new Service.FetchResponse()));
+ respParsers.put(PrepareResponse.class.getName(),
+ new ResponseTranslator(PrepareResponse.PARSER, new Service.PrepareResponse()));
+ respParsers.put(ResultSetResponse.class.getName(),
+ new ResponseTranslator(ResultSetResponse.PARSER, new Service.ResultSetResponse()));
+
+ RESPONSE_PARSERS = Collections.unmodifiableMap(respParsers);
+ }
+
+ /**
+ * Fetches the concrete message's Parser implementation.
+ *
+ * @param className The protocol buffer class name
+ * @return The Parser for the class
+ * @throws IllegalArgumentException If the argument is null or if a Parser for the given
+ * class name is not found.
+ */
+ public static RequestTranslator getParserForRequest(String className) {
+ if (null == className) {
+ throw new IllegalArgumentException("Cannot fetch parser for null class name");
+ }
+
+ RequestTranslator translator = REQUEST_PARSERS.get(className);
+ if (null == translator) {
+ throw new IllegalArgumentException("Cannot find parser for " + className);
+ }
+
+ return translator;
+ }
+
+ /**
+ * Fetches the concrete message's Parser implementation.
+ *
+ * @param className The protocol buffer class name
+ * @return The Parser for the class
+ * @throws IllegalArgumentException If the argument is null or if a Parser for the given
+ * class name is not found.
+ */
+ public static ResponseTranslator getParserForResponse(String className) {
+ if (null == className) {
+ throw new IllegalArgumentException("Cannot fetch parser for null class name");
+ }
+
+ ResponseTranslator translator = RESPONSE_PARSERS.get(className);
+ if (null == translator) {
+ throw new IllegalArgumentException("Cannot find parser for " + className);
+ }
+
+ return translator;
+ }
+
+ @Override public byte[] serializeResponse(Response response) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Message responseMsg = response.serialize();
+ serializeMessage(out, responseMsg);
+ return out.toByteArray();
+ }
+
+ @Override public byte[] serializeRequest(Request request) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Message requestMsg = request.serialize();
+ serializeMessage(out, requestMsg);
+ return out.toByteArray();
+ }
+
+ void serializeMessage(OutputStream out, Message msg) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ msg.writeTo(baos);
+
+ // TODO Using ByteString is copying the bytes of the message which sucks. Could try to
+ // lift the ZeroCopy implementation from HBase.
+ WireMessage wireMsg = WireMessage.newBuilder().setName(msg.getClass().getName()).
+ setWrappedMessage(ByteString.copyFrom(baos.toByteArray())).build();
+
+ wireMsg.writeTo(out);
+ }
+
+ @Override public Request parseRequest(byte[] bytes) throws InvalidProtocolBufferException {
+ WireMessage wireMsg = WireMessage.parseFrom(bytes);
+
+ String serializedMessageClassName = wireMsg.getName();
+ RequestTranslator translator = getParserForRequest(serializedMessageClassName);
+
+ return translator.transform(wireMsg.getWrappedMessage());
+ }
+
+ @Override public Response parseResponse(byte[] bytes) throws InvalidProtocolBufferException {
+ WireMessage wireMsg = WireMessage.parseFrom(bytes);
+
+ String serializedMessageClassName = wireMsg.getName();
+ ResponseTranslator translator = getParserForResponse(serializedMessageClassName);
+
+ return translator.transform(wireMsg.getWrappedMessage());
+ }
+}
+
+// End ProtobufTranslationImpl.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index b5404dc..deacdee 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -20,7 +20,6 @@ import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaParameter;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.avatica.ConnectionPropertiesImpl;
-import org.apache.calcite.avatica.Meta;
import org.apache.calcite.avatica.MetaImpl;
import java.sql.SQLException;
@@ -168,8 +167,7 @@ class RemoteMeta extends MetaImpl {
@Override public ExecuteResult prepareAndExecute(StatementHandle h,
String sql, long maxRowCount, PrepareCallback callback) {
// sync connection state if necessary
- connectionSync(new ConnectionHandle(h.connectionId),
- new ConnectionPropertiesImpl());
+ connectionSync(new ConnectionHandle(h.connectionId), new ConnectionPropertiesImpl());
final Service.ExecuteResponse response;
try {
synchronized (callback.getMonitor()) {
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
new file mode 100644
index 0000000..cb1a468
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.AvaticaUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+/**
+ * ProtobufService implementation that queries against a remote implementation, using
+ * protocol buffers as the serialized form.
+ */
+public class RemoteProtobufService extends ProtobufService {
+ private final URL url;
+ private final ProtobufTranslation translation;
+
+ public RemoteProtobufService(URL url, ProtobufTranslation translation) {
+ this.url = url;
+ this.translation = translation;
+ }
+
+ @Override public Response _apply(Request request) {
+ try {
+ final HttpURLConnection connection =
+ (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("POST");
+ connection.setDoInput(true);
+ connection.setDoOutput(true);
+ try (DataOutputStream wr = new DataOutputStream(connection.getOutputStream())) {
+ // Convert the Request to a protobuf and send it over the wire
+ wr.write(translation.serializeRequest(request));
+ wr.flush();
+ wr.close();
+ }
+ final int responseCode = connection.getResponseCode();
+ if (responseCode != HttpURLConnection.HTTP_OK) {
+ throw new RuntimeException("response code " + responseCode);
+ }
+ final InputStream inputStream = connection.getInputStream();
+ // Read the (serialized protobuf) response off the wire and convert it back to a Response
+ return translation.parseResponse(AvaticaUtils.readFullyToBytes(inputStream));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
+
+// End RemoteProtobufService.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
new file mode 100644
index 0000000..0dadc78
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RequestTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+
+/**
+ * Encapsulate the logic of transforming a protobuf Request message into the Avatica POJO request.
+ */
+public class RequestTranslator {
+
+ private final Parser<? extends Message> parser;
+ private final Service.Request impl;
+
+ public RequestTranslator(Parser<? extends Message> parser, Service.Request impl) {
+ this.parser = parser;
+ this.impl = impl;
+ }
+
+ public Service.Request transform(ByteString serializedMessage) throws
+ InvalidProtocolBufferException {
+ Message msg = parser.parseFrom(serializedMessage);
+ return impl.deserialize(msg);
+ }
+}
+
+// End RequestTranslator.java
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/cb7c213c/avatica/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java
new file mode 100644
index 0000000..0311e13
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/ResponseTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+
+/**
+ * Encapsulate the logic of transforming a protobuf Response message into the Avatica POJO Response.
+ */
+public class ResponseTranslator {
+
+ private final Parser<? extends Message> parser;
+ private final Service.Response impl;
+
+ public ResponseTranslator(Parser<? extends Message> parser, Service.Response impl) {
+ this.parser = parser;
+ this.impl = impl;
+ }
+
+ public Service.Response transform(ByteString serializedMessage) throws
+ InvalidProtocolBufferException {
+ Message msg = parser.parseFrom(serializedMessage);
+ return impl.deserialize(msg);
+ }
+}
+
+// End ResponseTranslator.java