You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/04/14 04:35:09 UTC
[5/9] basic framework for physical plan. abstraction of graph classes.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
new file mode 100644
index 0000000..dd00e04
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
@@ -0,0 +1,142 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Future;
+
+import java.util.Map;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+
+public class BitComImpl implements BitCom {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComImpl.class);
+
+ private Map<DrillbitEndpoint, BitTunnel> tunnels = Maps.newConcurrentMap();
+ private Map<SocketChannel, DrillbitEndpoint> endpoints = Maps.newConcurrentMap();
+ private Object lock = new Object();
+ private BitServer server;
+ private DrillbitContext context;
+
+ public BitComImpl(DrillbitContext context) {
+ this.context = context;
+ }
+
+ public int start() throws InterruptedException, DrillbitStartupException {
+ server = new BitServer(new BitComHandler(modifier), context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), context);
+ int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
+ return server.bind(port);
+ }
+
+ private Future<BitTunnel> getNode(DrillbitEndpoint endpoint) {
+ return null;
+
+// BitTunnel t = tunnels.get(endpoint);
+// if (t == null) {
+// synchronized (lock) {
+// t = tunnels.get(endpoint);
+// if (t != null) return t;
+// BitClient c = new BitClient(new BitComHandler(modifier), context.getAllocator().getUnderlyingAllocator(),
+// context.getBitLoopGroup(), context);
+//
+// // need to figure what to do here with regards to waiting for handshake before returning. Probably need to add
+// // future registry so that new endpoint registration ping the registry.
+// throw new UnsupportedOperationException();
+// c.connectAsClient(endpoint.getAddress(), endpoint.getBitPort()).await();
+// t = new BitTunnel(c);
+// tunnels.put(endpoint, t);
+//
+// }
+// }
+// return null;
+ }
+
+ @Override
+ public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, DrillbitEndpoint node, RecordBatch batch) {
+ return null;
+ }
+
+ @Override
+ public DrillRpcFuture<FragmentHandle> sendFragment(FragmentContext context, DrillbitEndpoint node,
+ PlanFragment fragment) {
+ return null;
+ }
+
+ @Override
+ public DrillRpcFuture<Ack> cancelFragment(FragmentContext context, DrillbitEndpoint node, FragmentHandle handle) {
+ return null;
+ }
+
+ @Override
+ public DrillRpcFuture<FragmentStatus> getFragmentStatus(FragmentContext context, DrillbitEndpoint node,
+ FragmentHandle handle) {
+ return null;
+ }
+
+ private final TunnelModifier modifier = new TunnelModifier();
+
+ /**
+ * Fully synchronized modifier. Contention should be low since endpoints shouldn't be constantly changing.
+ */
+ class TunnelModifier {
+ public BitTunnel remove(Channel ch) {
+ synchronized (this) {
+ DrillbitEndpoint endpoint = endpoints.remove(ch);
+ if (endpoint == null) {
+ logger
+ .warn("We attempted to find a endpoint from a provided channel and found none. This suggests a race condition or memory leak problem.");
+ return null;
+ }
+
+ BitTunnel tunnel = tunnels.remove(endpoint);
+ return tunnel;
+ }
+ }
+
+ public void create(SocketChannel channel, DrillbitEndpoint endpoint, RpcBus<?> bus) {
+ synchronized (this) {
+ endpoints.put(channel, endpoint);
+ tunnels.put(endpoint, new BitTunnel(bus));
+ }
+ }
+
+ }
+
+ public void close() {
+ Closeables.closeQuietly(server);
+ for (BitTunnel bt : tunnels.values()) {
+ bt.shutdownIfClient();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
new file mode 100644
index 0000000..e17b25c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
@@ -0,0 +1,64 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.rpc.BasicServer;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.google.protobuf.MessageLite;
+
+public class BitServer extends BasicServer<RpcType>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServer.class);
+
+ private final DrillbitContext context;
+ private final BitComHandler handler;
+
+ public BitServer(BitComHandler handler, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, DrillbitContext context) {
+ super(alloc, eventLoopGroup);
+ this.context = context;
+ this.handler = handler;
+ }
+
+ @Override
+ protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+ return handler.getResponseDefaultInstance(rpcType);
+ }
+
+ @Override
+ protected Response handle(SocketChannel ch, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ return handler.handle(context, rpcType, pBody, dBody);
+ }
+
+ @Override
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch) {
+
+ return super.getCloseHandler(ch);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
new file mode 100644
index 0000000..02991ad
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcBus;
+
+import com.google.common.io.Closeables;
+
+/**
+ * Interface provided for communication between two bits. Provided by both a server and a client implementation.
+ */
+public class BitTunnel {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitTunnel.class);
+
+ final RpcBus<?> bus;
+
+ public BitTunnel(RpcBus<?> bus){
+ this.bus = bus;
+ }
+
+ public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, RecordBatch batch){
+ return null;
+ }
+
+ public DrillRpcFuture<FragmentHandle> sendFragment(FragmentContext context, PlanFragment fragment){
+ return null;
+ }
+
+ public DrillRpcFuture<Ack> cancelFragment(FragmentContext context, FragmentHandle handle){
+ return null;
+ }
+
+ public DrillRpcFuture<FragmentStatus> getFragmentStatus(FragmentContext context, FragmentHandle handle){
+ return null;
+ }
+
+ public void shutdownIfClient(){
+ if(bus.isClient()) Closeables.closeQuietly(bus);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
new file mode 100644
index 0000000..cd6e15d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.QueryHandle;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcException;
+
+import com.google.protobuf.MessageLite;
+
+public class UserClient extends BasicClient<RpcType> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
+
+ public UserClient(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+ super(alloc, eventLoopGroup);
+ }
+
+
+ public DrillRpcFuture<QueryHandle> submitQuery(RunQuery query, ByteBuf data) throws RpcException {
+ return this.send(RpcType.RUN_QUERY, query, QueryHandle.class, data);
+ }
+
+ @Override
+ protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+ switch(rpcType){
+ case RpcType.ACK_VALUE:
+ return Ack.getDefaultInstance();
+ case RpcType.HANDSHAKE_VALUE:
+ return BitToUserHandshake.getDefaultInstance();
+ case RpcType.QUERY_HANDLE_VALUE:
+ return QueryHandle.getDefaultInstance();
+ case RpcType.QUERY_RESULT_VALUE:
+ return QueryResult.getDefaultInstance();
+ }
+ throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
+ }
+
+
+ @Override
+ protected Response handle(SocketChannel ch, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ logger.debug("Received a server > client message of type " + rpcType);
+ return new Response(RpcType.ACK, Ack.getDefaultInstance(), null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
new file mode 100644
index 0000000..fe70c85
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -0,0 +1,90 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.QueryHandle;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.BasicServer;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.google.protobuf.MessageLite;
+
+public class UserServer extends BasicServer<RpcType> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
+
+ final DrillbitContext context;
+
+ public UserServer(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, DrillbitContext context) {
+ super(alloc, eventLoopGroup);
+ this.context = context;
+ }
+
+ @Override
+ protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+ // a user server only expects acknowledgements on messages it creates.
+ switch (rpcType) {
+ case RpcType.ACK_VALUE:
+ return Ack.getDefaultInstance();
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ public DrillRpcFuture<QueryResult> sendResult(RunQuery query, ByteBuf data) throws RpcException {
+ return this.send(RpcType.QUERY_RESULT, query, QueryResult.class, data);
+ }
+
+
+ @Override
+ protected Response handle(SocketChannel channel, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ switch (rpcType) {
+
+ case RpcType.HANDSHAKE_VALUE:
+// logger.debug("Received handshake, responding in kind.");
+ return new Response(RpcType.HANDSHAKE, BitToUserHandshake.getDefaultInstance(), null);
+
+ case RpcType.RUN_QUERY_VALUE:
+// logger.debug("Received query to run. Returning query handle.");
+ return new Response(RpcType.QUERY_HANDLE, QueryHandle.newBuilder().setQueryId(1).build(), null);
+
+ case RpcType.REQUEST_RESULTS_VALUE:
+// logger.debug("Received results requests. Returning empty query result.");
+ return new Response(RpcType.QUERY_RESULT, QueryResult.getDefaultInstance(), null);
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/BackedRecord.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/BackedRecord.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/BackedRecord.java
new file mode 100644
index 0000000..e71d381
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/BackedRecord.java
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+public class BackedRecord implements Record {
+ DiffSchema schema;
+ DataRecord record;
+
+ public BackedRecord(DiffSchema schema, DataRecord record) {
+ this.schema = schema;
+ this.record = record;
+ }
+
+ public void setBackend(DiffSchema schema, DataRecord record) {
+ this.record = record;
+ this.schema = schema;
+ }
+
+ @Override
+ public DiffSchema getSchemaChanges() {
+ return schema;
+ }
+
+ @Override
+ public Object getField(int fieldId) {
+ return record.getData(fieldId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DataRecord.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DataRecord.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DataRecord.java
new file mode 100644
index 0000000..41738bb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DataRecord.java
@@ -0,0 +1,56 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+public class DataRecord {
+ private final Map<Integer, Object> dataMap;
+
+ public DataRecord() {
+ this.dataMap = Maps.newHashMap();
+ }
+
+ public void addData(int fieldId, Object data, boolean isList) {
+ //TODO: Rethink lists vs object data handling
+ if(!dataMap.containsKey(fieldId)) {
+ if(isList) {
+ dataMap.put(fieldId, Lists.newArrayList(data));
+ } else {
+ dataMap.put(fieldId, data);
+ }
+ } else {
+ if(isList) {
+ ((List)dataMap.get(fieldId)).add(data);
+ } else {
+ throw new IllegalStateException("Overriding field id existing data!");
+ }
+ }
+ }
+
+ public Object getData(int fieldId) {
+ Preconditions.checkArgument(dataMap.containsKey(fieldId));
+ return dataMap.get(fieldId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java
new file mode 100644
index 0000000..016e097
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class DiffSchema {
+ List<Field> addedFields;
+ List<Field> removedFields;
+
+ public DiffSchema() {
+ this.addedFields = Lists.newArrayList();
+ this.removedFields = Lists.newArrayList();
+ }
+
+ public void recordNewField(Field field) {
+ addedFields.add(field);
+ }
+
+ public boolean hasDiffFields() {
+ return !addedFields.isEmpty() || !removedFields.isEmpty();
+ }
+
+ public List<Field> getAddedFields() {
+ return addedFields;
+ }
+
+ public List<Field> getRemovedFields() {
+ return removedFields;
+ }
+
+ public void reset() {
+ addedFields.clear();
+ removedFields.clear();
+ }
+
+ public void addRemovedField(Field field) {
+ removedFields.add(field);
+ }
+
+ @Override
+ public String toString() {
+ return "DiffSchema{" +
+ "addedFields=" + addedFields +
+ ", removedFields=" + removedFields +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
new file mode 100644
index 0000000..e19c099
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
@@ -0,0 +1,135 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+public abstract class Field {
+ final FieldType fieldType;
+ int fieldId;
+ String prefixFieldName;
+ String fieldName;
+ RecordSchema schema;
+ RecordSchema parentSchema;
+ boolean read;
+
+
+ public Field(RecordSchema parentSchema, IdGenerator<Integer> generator, FieldType fieldType, String prefixFieldName) {
+ this.fieldId = generator.getNextId();
+ this.fieldType = fieldType;
+ this.prefixFieldName = prefixFieldName;
+ this.parentSchema = parentSchema;
+ }
+
+ public Field assignSchema(RecordSchema newSchema) {
+ checkState(schema == null, "Schema already assigned to field: %s", fieldName);
+ checkState(fieldType.isEmbedSchema(), "Schema cannot be assigned to non-embedded types: %s", fieldType);
+ schema = newSchema;
+ return this;
+ }
+
+ public String getFullFieldName() {
+ return Strings.isNullOrEmpty(prefixFieldName) ? fieldName : prefixFieldName + "." + fieldName;
+ }
+
+ public int getFieldId() {
+ return fieldId;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ public void setRead(boolean read) {
+ this.read = read;
+ }
+
+ protected abstract Objects.ToStringHelper addAttributesToHelper(Objects.ToStringHelper helper);
+
+ Objects.ToStringHelper getAttributesStringHelper() {
+ return Objects.toStringHelper(this).add("type", fieldType)
+ .add("id", fieldId)
+ .add("fullFieldName", getFullFieldName())
+ .add("schema", schema == null ? null : schema.toSchemaString()).omitNullValues();
+ }
+
+ @Override
+ public String toString() {
+ return addAttributesToHelper(getAttributesStringHelper()).toString();
+ }
+
+ public RecordSchema getParentSchema() {
+ return parentSchema;
+ }
+
+ public RecordSchema getAssignedSchema() {
+ return schema;
+ }
+
+ public FieldType getFieldType() {
+ return fieldType;
+ }
+
+ public void assignSchemaIfNull(RecordSchema newSchema) {
+ if (!hasSchema()) {
+ schema = newSchema;
+ }
+ }
+
+ public boolean isRead() {
+ return read;
+ }
+
+ public boolean hasSchema() {
+ return schema != null;
+ }
+
+ public static enum FieldType {
+ INTEGER(1),
+ FLOAT(2),
+ BOOLEAN(3),
+ STRING(4),
+ ARRAY(5, true),
+ MAP(6, true);
+
+ byte value;
+ boolean embedSchema;
+
+ FieldType(int value, boolean embedSchema) {
+ this.value = (byte) value;
+ this.embedSchema = embedSchema;
+ }
+
+ FieldType(int value) {
+ this(value, false);
+ }
+
+ public byte value() {
+ return value;
+ }
+
+ public boolean isEmbedSchema() {
+ return embedSchema;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/IdGenerator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/IdGenerator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/IdGenerator.java
new file mode 100644
index 0000000..728e8e1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/IdGenerator.java
@@ -0,0 +1,13 @@
+package org.apache.drill.exec.schema;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: tnachen
+ * Date: 1/2/13
+ * Time: 10:50 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public interface IdGenerator<T> {
+ public T getNextId();
+ public void reset();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java
new file mode 100644
index 0000000..efdc8fd
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java
@@ -0,0 +1,108 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class ListSchema implements RecordSchema {
+ private List<Field> fields;
+
+ public ListSchema() {
+ this.fields = Lists.newArrayList();
+ }
+
+ @Override
+ public void addField(Field field) {
+ if (field.fieldType.isEmbedSchema() || fields.isEmpty() || !isSingleTyped() ||
+ !Iterables.getOnlyElement(fields).equals(field.getFieldType())) {
+ fields.add(field);
+ }
+ }
+
+ @Override
+ public Field getField(String fieldName, int index) {
+ Field field;
+ if (isSingleTyped()) {
+ field = Iterables.getOnlyElement(fields, null);
+ } else {
+ field = index < fields.size() ? fields.get(index) : null;
+ }
+
+ return field;
+ }
+
+ @Override
+ public void removeField(Field field, int index) {
+ checkArgument(fields.size() > index);
+ checkArgument(checkNotNull(fields.get(index)).getFieldId() == field.getFieldId());
+ fields.remove(index);
+ }
+
+ @Override
+ public Iterable<? extends Field> getFields() {
+ return fields;
+ }
+
+ public boolean isSingleTyped() {
+ return fields.size() <= 1;
+ }
+
+ @Override
+ public String toSchemaString() {
+ StringBuilder builder = new StringBuilder("List_fields:[");
+ for (Field field : fields) {
+ builder.append(field.toString());
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+ @Override
+ public void resetMarkedFields() {
+ for (Field field : fields) {
+ field.setRead(false);
+ }
+ }
+
+ @Override
+ public Iterable<? extends Field> removeUnreadFields() {
+ final List<Field> removedFields = Lists.newArrayList();
+ Iterables.removeIf(fields, new Predicate<Field>() {
+ @Override
+ public boolean apply(Field field) {
+ if (!field.isRead()) {
+ removedFields.add(field);
+ return true;
+ } else if(field.hasSchema()) {
+ Iterables.addAll(removedFields, field.getAssignedSchema().removeUnreadFields());
+ }
+
+ return false;
+ }
+ });
+ return removedFields;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java
new file mode 100644
index 0000000..aa0d6aa
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.base.Objects;
+
+public class NamedField extends Field {
+ final FieldType keyType;
+
+ public NamedField(RecordSchema parentSchema, IdGenerator<Integer> generator, String prefixFieldName, String fieldName, Field.FieldType fieldType) {
+ this(parentSchema, generator, prefixFieldName, fieldName, fieldType, FieldType.STRING);
+ }
+
+ public NamedField(RecordSchema parentSchema, IdGenerator<Integer> generator, String prefixFieldName, String fieldName, Field.FieldType fieldType, FieldType keyType) {
+ super(parentSchema, generator, fieldType, prefixFieldName);
+ this.fieldName = fieldName;
+ this.keyType = FieldType.STRING;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ @Override
+ protected Objects.ToStringHelper addAttributesToHelper(Objects.ToStringHelper helper) {
+ return helper.add("keyType", keyType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ObjectSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ObjectSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ObjectSchema.java
new file mode 100644
index 0000000..9cc30f6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ObjectSchema.java
@@ -0,0 +1,91 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+public class ObjectSchema implements RecordSchema {
+ private final Map<String, Field> fields;
+
+ public ObjectSchema() {
+ fields = Maps.newHashMap();
+ }
+
+ @Override
+ public void addField(Field field) {
+ fields.put(field.getFieldName(), field);
+ }
+
+ @Override
+ public Field getField(String fieldName, int index) {
+ return fields.get(fieldName);
+ }
+
+ @Override
+ public void removeField(Field field, int index) {
+ fields.remove(field.getFieldName());
+ }
+
+ @Override
+ public Iterable<? extends Field> getFields() {
+ return fields.values();
+ }
+
+ @Override
+ public String toSchemaString() {
+ StringBuilder builder = new StringBuilder("Object_fields:[");
+ for (Field field : fields.values()) {
+ builder.append(field.toString()).append(" ");
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+ @Override
+ public void resetMarkedFields() {
+ for (Field field : fields.values()) {
+ field.setRead(false);
+ }
+ }
+
+ @Override
+ public Iterable<? extends Field> removeUnreadFields() {
+ final List<Field> removedFields = Lists.newArrayList();
+ Iterables.removeIf(fields.values(), new Predicate<Field>() {
+ @Override
+ public boolean apply(Field field) {
+ if (!field.isRead()) {
+ removedFields.add(field);
+ return true;
+ } else if (field.hasSchema()) {
+ Iterables.addAll(removedFields, field.getAssignedSchema().removeUnreadFields());
+ }
+
+ return false;
+ }
+ });
+ return removedFields;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/OrderedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/OrderedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/OrderedField.java
new file mode 100644
index 0000000..67fd2fa
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/OrderedField.java
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.base.Objects;
+
+public class OrderedField extends Field {
+ public OrderedField(RecordSchema parentSchema, IdGenerator<Integer> generator, FieldType fieldType, String prefixFieldName, int index) {
+ super(parentSchema, generator, fieldType, prefixFieldName);
+ this.fieldName = "[" + index + "]";
+ }
+
+ @Override
+ protected Objects.ToStringHelper addAttributesToHelper(Objects.ToStringHelper helper) {
+ return helper;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Record.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Record.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Record.java
new file mode 100644
index 0000000..16e83dc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Record.java
@@ -0,0 +1,29 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+
+import java.io.IOException;
+
+public interface Record {
+ public DiffSchema getSchemaChanges();
+ public Object getField(int fieldId);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/RecordSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/RecordSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/RecordSchema.java
new file mode 100644
index 0000000..db1f0ed
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/RecordSchema.java
@@ -0,0 +1,29 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+public interface RecordSchema {
+ public void addField(Field field);
+ public Field getField(String fieldName, int index);
+ public void removeField(Field field, int index);
+ public Iterable<? extends Field> getFields();
+ public String toSchemaString();
+ void resetMarkedFields();
+ Iterable<? extends Field> removeUnreadFields();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaIdGenerator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaIdGenerator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaIdGenerator.java
new file mode 100644
index 0000000..27ed2d8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaIdGenerator.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+public class SchemaIdGenerator implements IdGenerator<Integer> {
+ private int nextId;
+
+ public SchemaIdGenerator() {
+ nextId = 1;
+ }
+
+ public Integer getNextId() {
+ return nextId++;
+ }
+
+ @Override
+ public void reset() {
+ nextId = 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaRecorder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaRecorder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaRecorder.java
new file mode 100644
index 0000000..54a4e0e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaRecorder.java
@@ -0,0 +1,122 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
+import org.apache.drill.exec.schema.json.jackson.ScanJson;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SchemaRecorder {
+ DiffSchema diffSchema;
+ RecordSchema currentSchema;
+ List<Field> removedFields;
+
+ public SchemaRecorder() {
+ currentSchema = new ObjectSchema();
+ diffSchema = new DiffSchema();
+ removedFields = Lists.newArrayList();
+ }
+
+ public RecordSchema getCurrentSchema() {
+ return currentSchema;
+ }
+
+ public void recordData(ScanJson.ReadType currentReadType, ScanJson.ReadType readType, JsonParser parser, IdGenerator generator, DataRecord record, Field.FieldType fieldType, String prefixFieldName, String fieldName, int index) throws IOException {
+ Field field = currentSchema.getField(fieldName, index);
+
+ if (field == null || field.getFieldType() != fieldType) {
+ if (field != null) {
+ removeStaleField(index, field);
+ }
+ field = currentReadType.createField(currentSchema, generator, prefixFieldName, fieldName, fieldType, index);
+ field.setRead(true);
+ diffSchema.recordNewField(field);
+ currentSchema.addField(field);
+ } else {
+ field.setRead(true);
+ }
+
+ if (readType != null) {
+ RecordSchema origSchema = currentSchema;
+ if (field != null) {
+ currentSchema = field.getAssignedSchema();
+ }
+
+ RecordSchema newSchema = readType.createSchema();
+ field.assignSchemaIfNull(newSchema);
+ setCurrentSchemaIfNull(newSchema);
+ readType.readRecord(parser, generator, this, record, field.getFullFieldName());
+
+ currentSchema = origSchema;
+ } else {
+ RecordSchema schema = field.getParentSchema();
+ record.addData(field.getFieldId(), JacksonHelper.getValueFromFieldType(parser, fieldType), schema != null && schema instanceof ListSchema);
+ }
+ }
+
+ private void removeStaleField(int index, Field field) {
+ if (field.hasSchema()) {
+ removeChildFields(field);
+ }
+ removedFields.add(field);
+ currentSchema.removeField(field, index);
+ }
+
+ private void removeChildFields(Field field) {
+ RecordSchema schema = field.getAssignedSchema();
+ if(schema == null) { return; }
+ for (Field childField : schema.getFields()) {
+ removedFields.add(childField);
+ if (childField.hasSchema()) {
+ removeChildFields(childField);
+ }
+ }
+ }
+
+ public boolean hasDiffs() {
+ return diffSchema.hasDiffFields();
+ }
+
+ public DiffSchema getDiffSchema() {
+ return hasDiffs() ? diffSchema : null;
+ }
+
+ public void setCurrentSchemaIfNull(RecordSchema newSchema) {
+ if (currentSchema == null) {
+ currentSchema = newSchema;
+ }
+ }
+
+ public void reset() {
+ currentSchema.resetMarkedFields();
+ diffSchema.reset();
+ removedFields.clear();
+ }
+
+ public void addMissingFields() {
+ for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
+ diffSchema.addRemovedField(field);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
new file mode 100644
index 0000000..0643710
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema.json.jackson;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.google.common.collect.Maps;
+
+import org.apache.drill.exec.schema.Field;
+
+import java.io.IOException;
+import java.util.EnumMap;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class JacksonHelper {
+ private static final EnumMap<JsonToken, Field.FieldType> TYPE_LOOKUP = Maps.newEnumMap(JsonToken.class);
+
+ static {
+ TYPE_LOOKUP.put(JsonToken.VALUE_STRING, Field.FieldType.STRING);
+ TYPE_LOOKUP.put(JsonToken.VALUE_FALSE, Field.FieldType.BOOLEAN);
+ TYPE_LOOKUP.put(JsonToken.VALUE_TRUE, Field.FieldType.BOOLEAN);
+ TYPE_LOOKUP.put(JsonToken.START_ARRAY, Field.FieldType.ARRAY);
+ TYPE_LOOKUP.put(JsonToken.START_OBJECT, Field.FieldType.MAP);
+ TYPE_LOOKUP.put(JsonToken.VALUE_NUMBER_INT, Field.FieldType.INTEGER);
+ TYPE_LOOKUP.put(JsonToken.VALUE_NUMBER_FLOAT, Field.FieldType.FLOAT);
+ }
+
+ public static Field.FieldType getFieldType(JsonToken token) {
+ return TYPE_LOOKUP.get(token);
+ }
+
+ public static Object getValueFromFieldType(JsonParser parser, Field.FieldType fieldType) throws IOException {
+ switch(fieldType) {
+ case INTEGER:
+ return parser.getIntValue();
+ case STRING:
+ return parser.getValueAsString();
+ case FLOAT:
+ return parser.getFloatValue();
+ case BOOLEAN:
+ return parser.getBooleanValue();
+ default:
+ throw new RuntimeException("Unexpected Field type to return value: " + fieldType.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperator.java
new file mode 100644
index 0000000..e450ee9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperator.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema.json.jackson;
+
+import com.google.common.collect.Lists;
+
+import org.apache.drill.exec.schema.Record;
+
+import java.io.IOException;
+import java.util.List;
+
+public abstract class PhysicalOperator {
+ List<PhysicalOperatorIterator> parents;
+
+ public PhysicalOperator(PhysicalOperatorIterator... parents) {
+ this.parents = Lists.newArrayList(parents);
+ }
+
+ public abstract PhysicalOperatorIterator getIterator();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperatorIterator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperatorIterator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperatorIterator.java
new file mode 100644
index 0000000..bf4053e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperatorIterator.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema.json.jackson;
+
+import org.apache.drill.exec.schema.Record;
+
+import java.io.IOException;
+
+public interface PhysicalOperatorIterator{
+ public enum NextOutcome {NONE_LEFT, INCREMENTED_SCHEMA_UNCHANGED, INCREMENTED_SCHEMA_CHANGED}
+ public Record getRecord();
+ public NextOutcome next() throws IOException;
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/ScanJson.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/ScanJson.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/ScanJson.java
new file mode 100644
index 0000000..a1c30e9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/ScanJson.java
@@ -0,0 +1,203 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema.json.jackson;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.google.common.io.InputSupplier;
+import com.google.common.io.Resources;
+
+import org.apache.drill.exec.schema.*;
+
+import java.io.*;
+import java.util.Map;
+
+public class ScanJson extends PhysicalOperator {
+ private ScanJsonIterator iterator;
+
+ private static final Map<JsonToken, ReadType> READ_TYPES = Maps.newHashMap();
+
+ static {
+ READ_TYPES.put(JsonToken.START_ARRAY, ReadType.ARRAY);
+ READ_TYPES.put(JsonToken.START_OBJECT, ReadType.OBJECT);
+ }
+
+ public ScanJson(String inputName) throws IOException {
+ super();
+ this.iterator = new ScanJsonIterator(inputName);
+ }
+
+ @Override
+ public PhysicalOperatorIterator getIterator() {
+ return iterator;
+ }
+
+ class ScanJsonIterator implements PhysicalOperatorIterator {
+ private JsonParser parser;
+ private SchemaRecorder recorder;
+ private BackedRecord record;
+ private IdGenerator generator;
+
+ private ScanJsonIterator(String inputName) throws IOException {
+ InputSupplier<InputStreamReader> input;
+ if (inputName.startsWith("resource:")) {
+ input = Resources.newReaderSupplier(Resources.getResource(inputName.substring(inputName.indexOf(':') + 1)), Charsets.UTF_8);
+ } else {
+ input = Files.newReaderSupplier(new File(inputName), Charsets.UTF_8);
+ }
+
+ JsonFactory factory = new JsonFactory();
+ parser = factory.createJsonParser(input.getInput());
+ parser.nextToken(); // Read to the first START_OBJECT token
+ recorder = new SchemaRecorder();
+ generator = new SchemaIdGenerator();
+ }
+
+ @Override
+ public Record getRecord() {
+ return record;
+ }
+
+ @Override
+ public NextOutcome next() throws IOException {
+ if (parser.isClosed() || !parser.hasCurrentToken()) {
+ return NextOutcome.NONE_LEFT;
+ }
+
+ recorder.reset();
+
+ DataRecord dataRecord = new DataRecord();
+ ReadType.OBJECT.readRecord(parser, generator, recorder, dataRecord, null);
+
+ parser.nextToken(); // Read to START_OBJECT token
+
+ if (!parser.hasCurrentToken()) {
+ parser.close();
+ }
+
+ recorder.addMissingFields();
+ if (record == null) {
+ record = new BackedRecord(recorder.getDiffSchema(), dataRecord);
+ } else {
+ record.setBackend(recorder.getDiffSchema(), dataRecord);
+ }
+ return recorder.hasDiffs() ? NextOutcome.INCREMENTED_SCHEMA_CHANGED : NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
+ }
+
+ public RecordSchema getCurrentSchema() {
+ return recorder.getCurrentSchema();
+ }
+ }
+
+ public static enum ReadType {
+ ARRAY(JsonToken.END_ARRAY) {
+ @Override
+ public Field createField(RecordSchema parentSchema, IdGenerator<Integer> generator, String prefixFieldName, String fieldName, Field.FieldType fieldType, int index) {
+ return new OrderedField(parentSchema, generator, fieldType, prefixFieldName, index);
+ }
+
+ @Override
+ public RecordSchema createSchema() throws IOException {
+ return new ListSchema();
+ }
+ },
+ OBJECT(JsonToken.END_OBJECT) {
+ @Override
+ public Field createField(RecordSchema parentSchema, IdGenerator<Integer> generator, String prefixFieldName, String fieldName, Field.FieldType fieldType, int index) {
+ return new NamedField(parentSchema, generator, prefixFieldName, fieldName, fieldType);
+ }
+
+ @Override
+ public RecordSchema createSchema() throws IOException {
+ return new ObjectSchema();
+ }
+ };
+
+ private final JsonToken endObject;
+
+ ReadType(JsonToken endObject) {
+ this.endObject = endObject;
+ }
+
+ public JsonToken getEndObject() {
+ return endObject;
+ }
+
+ public void readRecord(JsonParser parser, IdGenerator generator, SchemaRecorder recorder, DataRecord record, String prefixFieldName) throws IOException {
+ JsonToken token = parser.nextToken();
+ JsonToken endObject = getEndObject();
+ int index = 0;
+ while (token != endObject) {
+ if (token == JsonToken.FIELD_NAME) {
+ token = parser.nextToken();
+ continue;
+ }
+
+ String fieldName = parser.getCurrentName();
+ Field.FieldType fieldType = JacksonHelper.getFieldType(token);
+ ReadType readType = READ_TYPES.get(token);
+ if (fieldType != null) { // Including nulls
+ recorder.recordData(this, readType, parser, generator, record, fieldType, prefixFieldName, fieldName, index);
+ }
+ token = parser.nextToken();
+ ++index;
+ }
+ }
+
+ public abstract RecordSchema createSchema() throws IOException;
+
+ public abstract Field createField(RecordSchema parentSchema, IdGenerator<Integer> generator, String prefixFieldName, String fieldName, Field.FieldType fieldType, int index);
+ }
+
+ public static void main(String[] args) throws IOException {
+ if (args.length != 1) {
+ System.err.println("Requires json path: ScanJson <json_path>");
+ return;
+ }
+
+ String jsonPath = args[0];
+
+ System.out.println("Reading json input...");
+ ScanJson sj = new ScanJson(jsonPath);
+ ScanJsonIterator iterator = (ScanJsonIterator) sj.getIterator();
+ long count = 0;
+
+ while (iterator.next() != PhysicalOperatorIterator.NextOutcome.NONE_LEFT) {
+ Record record = iterator.getRecord();
+ System.out.println("Record " + ++count);
+ System.out.println("Schema: ");
+ System.out.println(iterator.getCurrentSchema().toSchemaString());
+ System.out.println();
+ System.out.println("Changes since last record: ");
+ System.out.println();
+ System.out.println(record.getSchemaChanges());
+ System.out.println();
+ }
+ }
+}
+
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/ProtobufSchemaTransformer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/ProtobufSchemaTransformer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/ProtobufSchemaTransformer.java
new file mode 100644
index 0000000..a81a9d9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/ProtobufSchemaTransformer.java
@@ -0,0 +1,109 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema.transform;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.DescriptorProtos;
+
+import org.apache.drill.exec.schema.Field;
+import org.apache.drill.exec.schema.ListSchema;
+import org.apache.drill.exec.schema.ObjectSchema;
+import org.apache.drill.exec.schema.RecordSchema;
+
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class ProtobufSchemaTransformer implements SchemaTransformer<DescriptorProtos.DescriptorProto> {
+ private static final Map<Field.FieldType, Function<Field, Object>> FIELD_MAP = Maps.newEnumMap(Field.FieldType.class);
+ private static final Map<Field.FieldType, DescriptorProtos.FieldDescriptorProto.Type> TYPE_MAP = Maps.newEnumMap(Field.FieldType.class);
+ private int fieldIndex = 0;
+ public static final String LIST_SCHEMA_NAME = "_EmbeddedList"; //Read from config?
+
+ static {
+ TYPE_MAP.put(Field.FieldType.BOOLEAN, DescriptorProtos.FieldDescriptorProto.Type.TYPE_BOOL);
+ TYPE_MAP.put(Field.FieldType.STRING, DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING);
+ TYPE_MAP.put(Field.FieldType.FLOAT, DescriptorProtos.FieldDescriptorProto.Type.TYPE_FLOAT);
+ TYPE_MAP.put(Field.FieldType.INTEGER, DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32);
+ }
+
+ private DescriptorProtos.DescriptorProto.Builder transformSchema(String name, DescriptorProtos.DescriptorProto.Builder parentBuilder, RecordSchema schema) {
+ if (schema instanceof ObjectSchema) {
+ return addObjectSchema(name, parentBuilder, ObjectSchema.class.cast(schema));
+ } else if (schema instanceof ListSchema) {
+ return addListSchema(name, ListSchema.class.cast(schema));
+ } else {
+ throw new RuntimeException("Unknown schema passed to transformer: " + schema);
+ }
+ }
+
+ public DescriptorProtos.DescriptorProto transformSchema(String name, RecordSchema schema) {
+ return transformSchema(name, null, schema).build();
+ }
+
+ private DescriptorProtos.DescriptorProto.Builder addListSchema(String name, ListSchema schema) {
+ DescriptorProtos.DescriptorProto.Builder builder = DescriptorProtos.DescriptorProto.newBuilder().setName(name);
+ DescriptorProtos.FieldDescriptorProto.Builder builderForValue = DescriptorProtos.FieldDescriptorProto.newBuilder();
+ builderForValue.setTypeName(LIST_SCHEMA_NAME).setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE);
+ builder.addField(builderForValue);
+ return builder;
+ }
+
+ private DescriptorProtos.DescriptorProto.Builder addObjectSchema(String name,
+ DescriptorProtos.DescriptorProto.Builder parentBuilder,
+ ObjectSchema schema) {
+ DescriptorProtos.DescriptorProto.Builder builder = DescriptorProtos.DescriptorProto.newBuilder().setName(name);
+ for (Field field : schema.getFields()) {
+ DescriptorProtos.FieldDescriptorProto.Builder builderForValue = DescriptorProtos.FieldDescriptorProto.newBuilder();
+ String fieldName = field.getFieldName();
+ builderForValue.setName(fieldName).setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL).setNumber(++fieldIndex);
+ if (field.hasSchema()) {
+ RecordSchema innerSchema = field.getAssignedSchema();
+ if (innerSchema instanceof ObjectSchema) {
+ addObjectSchema(fieldName, builder, (ObjectSchema) innerSchema);
+ DescriptorProtos.DescriptorProto innerProto = Iterables.getLast(builder.getNestedTypeList());
+ builderForValue.setTypeName(innerProto.getName()).setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE);
+ } else if (innerSchema instanceof ListSchema) {
+ builderForValue.setTypeName(LIST_SCHEMA_NAME).setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE);
+ }
+ } else {
+ builderForValue.setType(getProtoType(field.getFieldType()));
+ }
+ builder.addField(builderForValue);
+ }
+
+ if (parentBuilder != null) {
+ parentBuilder.addNestedType(builder);
+ }
+
+ return builder;
+ }
+
+ private DescriptorProtos.FieldDescriptorProto.Type getProtoType(Field.FieldType fieldType) {
+ return checkNotNull(TYPE_MAP.get(fieldType));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/SchemaTransformer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/SchemaTransformer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/SchemaTransformer.java
new file mode 100644
index 0000000..54d851f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/SchemaTransformer.java
@@ -0,0 +1,30 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema.transform;
+
+import org.apache.drill.exec.schema.Field;
+import org.apache.drill.exec.schema.ListSchema;
+import org.apache.drill.exec.schema.ObjectSchema;
+import org.apache.drill.exec.schema.RecordSchema;
+
+import java.util.List;
+
+public interface SchemaTransformer<T> {
+ public T transformSchema(String name, RecordSchema schema);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
new file mode 100644
index 0000000..6cc35e2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import java.net.InetAddress;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.BufferAllocator;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.HazelCache;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
+import org.apache.drill.exec.coord.ZKClusterCoordinator;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.service.ServiceEngine;
+
+import com.google.common.io.Closeables;
+
+public class Drillbit {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);
+
+ public static void main(String[] cli) throws DrillbitStartupException, InterruptedException {
+ Drillbit bit = null;
+ try {
+ logger.debug("Setting up Drillbit.");
+ StartupOptions options = StartupOptions.parse(cli);
+ DrillConfig config = DrillConfig.create(options.getConfigLocation());
+ bit = new Drillbit(config);
+ } catch (Exception ex) {
+ throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
+ }
+
+
+ try {
+ logger.debug("Starting Drillbit.");
+ bit.run();
+ } catch (Exception e) {
+ throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
+ }
+ Thread.sleep(10000);
+ // at this point, the main thread can terminate as we have started all our working threads.
+ }
+
+ private final DrillbitContext context;
+ final BufferAllocator pool;
+ final ClusterCoordinator coord;
+ final ServiceEngine engine;
+ final DistributedCache cache;
+ private RegistrationHandle handle;
+
+ public Drillbit(DrillConfig config) throws Exception {
+ final DrillbitContext context = new DrillbitContext(config, this);
+ Runtime.getRuntime().addShutdownHook(new ShutdownThread(config));
+ this.context = context;
+ this.pool = BufferAllocator.getAllocator(context);
+ this.coord = new ZKClusterCoordinator(config);
+ this.engine = new ServiceEngine(context);
+ this.cache = new HazelCache(context.getConfig());
+ }
+
+ public void run() throws Exception {
+ coord.start();
+ engine.start();
+
+ DrillbitEndpoint md = DrillbitEndpoint.newBuilder().setAddress(InetAddress.getLocalHost().getHostAddress())
+ .setBitPort(engine.getBitPort()).setUserPort(engine.getUserPort()).build();
+ handle = coord.register(md);
+ cache.run(md);
+ }
+
+ public void close() {
+ if (coord != null) coord.unregister(handle);
+
+ try {
+ Thread.sleep(context.getConfig().getInt(ExecConstants.ZK_REFRESH) * 2);
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted while sleeping during coordination deregistration.");
+ }
+
+ Closeables.closeQuietly(engine);
+ Closeables.closeQuietly(coord);
+ Closeables.closeQuietly(pool);
+ logger.info("Shutdown completed.");
+ }
+
+ private class ShutdownThread extends Thread {
+ ShutdownThread(DrillConfig config) {
+ this.setName("ShutdownHook");
+ }
+
+ @Override
+ public void run() {
+ logger.info("Received shutdown request.");
+ close();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
new file mode 100644
index 0000000..94c8207
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -0,0 +1,65 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.List;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.BufferAllocator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.bit.BitCom;
+
+public class DrillbitContext {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
+
+ private final DrillConfig config;
+ private final Drillbit underlyingBit;
+ private final NioEventLoopGroup loop;
+
+ public DrillbitContext(DrillConfig config, Drillbit underlyingBit) {
+ super();
+ this.config = config;
+ this.underlyingBit = underlyingBit;
+ this.loop = new NioEventLoopGroup(1, new NamedThreadFactory("BitServer-"));
+ }
+
+ public DrillConfig getConfig() {
+ return config;
+ }
+
+ public List<DrillbitEndpoint> getBits(){
+ return underlyingBit.coord.getAvailableEndpoints();
+ }
+
+ public BufferAllocator getAllocator(){
+ return underlyingBit.pool;
+ }
+
+
+ public NioEventLoopGroup getBitLoopGroup(){
+ return loop;
+ }
+
+ public BitCom getBitCom(){
+ return underlyingBit.engine.getBitCom();
+ }
+
+}