You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/04/14 04:35:08 UTC
[4/9] basic framework for physical plan. abstraction of graph classes.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/StartupOptions.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/StartupOptions.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/StartupOptions.java
new file mode 100644
index 0000000..66bbad8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/StartupOptions.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+@Parameters(separators = "=")
+public class StartupOptions {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StartupOptions.class);
+
+ @Parameter(names={"-h", "--help"}, description="Provide description of usage.", help=true)
+ private boolean help = false;
+
+ @Parameter(names= {"-d", "--debug"}, description="Whether you want to run the program in debug mode.", required=false)
+ private boolean debug = false;
+
+ @Parameter(names= {"-c", "--config"}, description="Configuration file you want to load. Defaults to loading 'drill-override.conf' from the classpath.", required=false)
+ private String configLocation = null;
+
+ @Parameter
+ private List<String> exccess = new ArrayList<String>();
+
+ public boolean isDebug() {
+ return debug;
+ }
+
+ public String getConfigLocation() {
+ return configLocation;
+ }
+
+ public List<String> getExccess() {
+ return exccess;
+ }
+
+ public static StartupOptions parse(String[] cliArgs) {
+ logger.debug("Parsing arguments.");
+ StartupOptions args = new StartupOptions();
+ JCommander jc = new JCommander(args, cliArgs);
+ if(args.help){
+ jc.usage();
+ System.exit(0);
+ }
+ return args;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
new file mode 100644
index 0000000..97db72e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.service;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.bit.BitCom;
+import org.apache.drill.exec.rpc.bit.BitComImpl;
+import org.apache.drill.exec.rpc.user.UserServer;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.google.common.io.Closeables;
+
+public class ServiceEngine implements Closeable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ServiceEngine.class);
+
+ UserServer userServer;
+ BitComImpl bitCom;
+ int userPort;
+ int bitPort;
+ DrillbitContext context;
+
+ public ServiceEngine(DrillbitContext context){
+ ByteBufAllocator allocator = context.getAllocator().getUnderlyingAllocator();
+ userServer = new UserServer(allocator, new NioEventLoopGroup(1, new NamedThreadFactory("UserServer-")), context);
+ bitCom = new BitComImpl(context);
+ }
+
+ public void start() throws DrillbitStartupException, InterruptedException{
+ userPort = userServer.bind(context.getConfig().getInt(ExecConstants.INITIAL_USER_PORT));
+ bitPort = bitCom.start();
+ }
+
+ public int getBitPort(){
+ return bitPort;
+ }
+
+ public int getUserPort(){
+ return userPort;
+ }
+
+ public BitCom getBitCom(){
+ return bitCom;
+ }
+
+ @Override
+ public void close() throws IOException {
+ Closeables.closeQuietly(userServer);
+ Closeables.closeQuietly(bitCom);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/QueryOptimizerRule.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/QueryOptimizerRule.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/QueryOptimizerRule.java
new file mode 100644
index 0000000..0536206
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/QueryOptimizerRule.java
@@ -0,0 +1,21 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+public interface QueryOptimizerRule {
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
new file mode 100644
index 0000000..9fc4165
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+import org.apache.drill.exec.exception.ExecutionSetupException;
+import org.apache.drill.exec.ops.OutputMutator;
+import org.apache.drill.exec.record.BatchSchema;
+
+public interface RecordReader {
+
+ /**
+ * Configure the RecordReader with the provided schema and the record batch that should be written to.
+ *
+ * @param knownSchema
+ * The set of fields that should be written to as well as the expected types for those fields. In the case
+ * that RecordReader has a known schema and the expectedSchema does not match the actual schema, a
+ * ExceptionSetupException will be thrown.
+ * @param output
+ * The place where output for a particular scan should be written. The record reader is responsible for
+ * mutating the set of schema values for that particular record.
+ * @throws ExecutionSetupException
+ */
+ public abstract void setup(BatchSchema expectedSchema, OutputMutator output) throws ExecutionSetupException;
+
+ /**
+ * Increment record reader forward, writing into the provided output batch.
+ *
+ * @return The number of additional records added to the output.
+ */
+ public abstract int next();
+
+ public abstract void cleanup();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordRecorder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordRecorder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordRecorder.java
new file mode 100644
index 0000000..c5fcc42
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordRecorder.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface RecordRecorder {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordRecorder.class);
+
+ public void setup() throws IOException;
+
+ /**
+ *
+ * @param batch
+ * @return
+ */
+ public boolean record(RecordBatch batch);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
new file mode 100644
index 0000000..83749c7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.google.common.collect.ListMultimap;
+
+public interface StorageEngine {
+ public boolean supportsRead();
+
+ public boolean supportsWrite();
+
+ public enum PartitionCapabilities {
+ NONE, HASH, RANGE;
+ }
+
+ public List<QueryOptimizerRule> getOptimizerRules();
+
+ /**
+ * Get the set of read entries required for a particular Scan (read) node. This is somewhat analogous to traditional
+ * MapReduce. The difference is, this is the most granular split paradigm.
+ *
+ * @param scan
+ * The configured scan entries.
+ * @return
+ * @throws IOException
+ */
+ public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException;
+
+ /**
+ * Get the set of Drillbit endpoints that are available for each read entry. Note that it is possible for a read entry
+ * to have no Drillbit locations. In that case, the multimap will contain no values for that read entry.
+ *
+ * @return Multimap of ReadEntry > List<DrillbitEndpoint> for ReadEntries with available locations.
+ */
+ public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries);
+
+ /**
+ * Get a particular reader for a fragment context.
+ * @param context
+ * @param readEntry
+ * @return
+ * @throws IOException
+ */
+ public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException;
+
+ /**
+ *
+ * @param context
+ * @param writeEntry
+ * @return
+ * @throws IOException
+ */
+ public RecordRecorder getWriter(FragmentContext context, WriteEntry writeEntry) throws IOException;
+
+
+ public interface ReadEntry {
+ public Cost getCostEstimate();
+ }
+
+ public interface WriteEntry {
+ }
+
+ public static class Cost {
+ public long disk;
+ public long network;
+ public long memory;
+ public long cpu;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
new file mode 100644
index 0000000..b2e31e9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
@@ -0,0 +1,82 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SetupException;
+import org.apache.drill.exec.server.DrillbitContext;
+
+public class StorageEngineRegistry {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StorageEngineRegistry.class);
+
+ private Map<Object, Constructor<? extends StorageEngine>> availableEngines = new HashMap<Object, Constructor<? extends StorageEngine>>();
+ private Map<StorageEngineConfig, StorageEngine> activeEngines = new HashMap<StorageEngineConfig, StorageEngine>();
+
+ private DrillbitContext context;
+ public StorageEngineRegistry(DrillbitContext context){
+ this.context = context;
+ setup(context.getConfig());
+ }
+
+ @SuppressWarnings("unchecked")
+ public void setup(DrillConfig config){
+ Collection<Class<? extends StorageEngine>> engines = PathScanner.scanForImplementations(StorageEngine.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+ logger.debug("Loading storage engines {}", engines);
+ for(Class<? extends StorageEngine> engine: engines){
+ int i =0;
+ for(Constructor<?> c : engine.getConstructors()){
+ Class<?>[] params = c.getParameterTypes();
+ if(params.length != 2 || params[1] == DrillbitContext.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){
+ logger.debug("Skipping ReferenceStorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, DrillbitContext)]", c, engine);
+ continue;
+ }
+ availableEngines.put(params[0], (Constructor<? extends StorageEngine>) c);
+ i++;
+ }
+ if(i == 0){
+ logger.debug("Skipping registration of ReferenceStorageEngine {} as it doesn't have a constructor with the parameters of (StorangeEngineConfig, Config)", engine.getCanonicalName());
+ }
+ }
+ }
+
+ public StorageEngine getEngine(StorageEngineConfig engineConfig) throws SetupException{
+ StorageEngine engine = activeEngines.get(engineConfig);
+ if(engine != null) return engine;
+ Constructor<? extends StorageEngine> c = availableEngines.get(engineConfig.getClass());
+ if(c == null) throw new SetupException(String.format("Failure finding StorageEngine constructor for config %s", engineConfig));
+ try {
+ return c.newInstance(engineConfig, context);
+ } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException)e).getTargetException() : e;
+ if(t instanceof SetupException) throw ((SetupException) t);
+ throw new SetupException(String.format("Failure setting up new storage engine configuration for config %s", engineConfig), t);
+ }
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto
new file mode 100644
index 0000000..43c408d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto
@@ -0,0 +1,32 @@
+package exec;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "CoordinationProtos";
+option optimize_for = LITE_RUNTIME;
+
+message DrillbitEndpoint{
+ optional string address = 1;
+ optional int32 user_port = 2;
+ optional int32 bit_port = 3;
+ optional Roles roles = 4;
+}
+
+message DrillServiceInstance{
+ optional string id = 1;
+ optional int64 registrationTimeUTC = 2;
+ optional DrillbitEndpoint endpoint = 3;
+}
+
+message WorkQueueStatus{
+ optional DrillbitEndpoint endpoint = 1;
+ optional int32 queue_length = 2;
+ optional int64 report_time = 3;
+}
+
+message Roles{
+ optional bool sql_query = 1 [default = true];
+ optional bool logical_plan = 2 [default = true];
+ optional bool physical_plan = 3 [default = true];
+ optional bool java_executor = 4 [default = true];
+ optional bool distributed_cache = 5 [default = true];
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
new file mode 100644
index 0000000..cd8bda2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
@@ -0,0 +1,65 @@
+package exec.bit;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "ExecProtos";
+option optimize_for = LITE_RUNTIME;
+import "SchemaDef.proto";
+import "Coordination.proto";
+
+
+////// UserToBit RPC ///////
+enum RpcType {
+ HANDSHAKE = 0;
+ ACK = 1;
+ GOODBYE = 2;
+
+ // bit requests
+ REQ_INIATILIZE_FRAGMENT = 3; // Returns Handle
+ REQ_RECORD_BATCH = 4; // send record batch overview, returns Ack
+ REQ_BATCH_CHUNK = 5; // send additional batch chunk, returns Ack.
+ REQ_CANCEL_FRAGMENT = 6; // send a cancellation message for a fragment, returns Ack
+ REQ_FRAGMENT_STATUS = 7; // get a fragment status, returns FragmentStatus
+ REQ_BIT_STATUS = 8; // get bit status.
+
+ // bit responses
+ RESP_FRAGMENT_HANDLE = 9;
+ RESP_FRAGMENT_STATUS = 10;
+ RESP_BIT_STATUS = 11;
+ RESP_BATCH_CHUNK = 12;
+}
+
+
+message BitColumnData {
+
+ enum ColumnEncoding {
+ PROTOBUF = 0;
+ }
+
+ message BitColumn {
+ optional int32 field = 1;
+ optional int32 length = 2;
+ optional ColumnEncoding mode = 3;
+ }
+
+ optional SchemaDef schema = 1;
+ optional int32 record_count = 2;
+ optional int32 total_size = 3;
+ repeated BitColumn column = 4;
+
+}
+
+
+message BitHandshake{
+ optional DrillbitEndpoint endpoint = 1;
+}
+
+message BitBatchChunk {}
+message BitStatus {}
+message FragmentStatus {}
+message RecordBatchHeader {}
+message PlanFragment {}
+
+message FragmentHandle {
+ optional int64 fragment_id = 1;
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
new file mode 100644
index 0000000..ebc7dca
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
@@ -0,0 +1,35 @@
+package exec.rpc;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "GeneralRPCProtos";
+option optimize_for = LITE_RUNTIME;
+
+message Ack{
+ optional bool ok = 1;
+}
+
+enum RpcMode {
+ REQUEST = 0;
+ RESPONSE = 1;
+ RESPONSE_FAILURE = 2;
+}
+
+message RpcHeader{
+ optional RpcMode mode = 1;
+ optional int32 coordination_id = 2; // reusable coordination identifier. Sender defines. Server returns on return. Irrelevant for purely single direction rpc.
+ optional int32 rpc_type = 3; // a rpc mode specific rpc type.
+}
+
+message CompleteRpcMessage {
+ optional RpcHeader header = 1; // required
+ optional bytes protobuf_body = 2; // required
+ optional bytes raw_body = 3; // optional
+}
+
+// Class to be used when an unexpected exception occurs while a rpc call is being evaluated.
+message RpcFailure {
+ optional int64 error_id = 1; // for server trackback.
+ optional int32 error_code = 2; // system defined error code.
+ optional string short_error = 3;
+ optional string long_error = 4;
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
new file mode 100644
index 0000000..44e2df9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
@@ -0,0 +1,37 @@
+package exec;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "SchemaDefProtos";
+option optimize_for = LITE_RUNTIME;
+
+
+// Schema Definitions //
+enum DataType {
+ LATE = 0;
+ INT32 = 1;
+ INT64 = 2;
+ FLOAT32 = 3;
+ FLOAT64 = 4;
+ UTF8 = 5;
+ BYTES = 6;
+}
+
+enum DataMode {
+ REQUIRED = 0;
+ OPTIONAL = 1;
+ REPEATED = 2;
+ MAP = 3;
+}
+
+message SchemaDef {
+ repeated FieldDef field = 1;
+}
+
+message FieldDef {
+ optional string name = 1;
+ optional DataMode mode = 2;
+
+ // If DataMode == 0-2, type should be populated and fields should be empty. Otherwise, type should empty and fields should be defined.
+ optional DataType type = 3;
+ repeated FieldDef fields = 4;
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
new file mode 100644
index 0000000..225d1a0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
@@ -0,0 +1,93 @@
+package exec.user;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "UserProtos";
+option optimize_for = LITE_RUNTIME;
+import "SchemaDef.proto";
+
+////// UserToBit RPC ///////
+enum RpcType {
+ HANDSHAKE = 0;
+ ACK = 1;
+ GOODBYE = 2;
+
+ // user to bit
+ RUN_QUERY = 3;
+ REQUEST_RESULTS = 4;
+
+ // bit to user
+ QUERY_RESULT = 6;
+ QUERY_HANDLE = 7;
+}
+
+message UserToBitHandshake {
+ optional bool support_listening = 1;
+ optional int32 rpc_version = 2;
+}
+
+message RequestResults {
+ optional int64 query_id = 1;
+ optional int32 maximum_responses = 2;
+}
+
+message RunQuery {
+ optional QueryResultsMode mode = 1;
+ optional string plan = 2;
+}
+
+enum QueryResultsMode {
+ STREAM_FULL = 1; // Server will inform the client regularly on the status of the query. Once the query is completed, service will inform the client as each query chunk is made available.
+ STREAM_FIRST = 2; // Server will inform the client regularly on the status of the query. Once the query is completed, server will inform the client of the first query chunk.
+ QUERY_FOR_STATUS = 3; // Client will need to query for status of query.
+}
+
+
+message BitToUserHandshake {
+ optional int32 rpc_version = 1;
+}
+
+message QueryHandle {
+ optional int64 query_id = 1;
+}
+
+message NodeStatus {
+ optional int32 node_id = 1;
+ optional int64 memory_footprint = 2;
+}
+
+message QueryResult {
+ enum Outcome {
+ RUNNING = 0;
+ FAILED = 1;
+ COMPLETED = 2;
+ WAITING = 3;
+ }
+
+ optional Outcome outcome = 1;
+ optional SchemaDef schema = 2;
+ optional bool is_last_chunk = 3;
+ optional int32 row_count = 4;
+ optional int64 records_scan = 5;
+ optional int64 records_error = 6;
+ optional int64 submission_time = 7;
+ repeated NodeStatus node_status = 8;
+ repeated Error error = 9;
+}
+
+message TextErrorLocation{
+ optional int32 start_column = 2;
+ optional int32 start_row = 3;
+ optional int32 end_column = 4;
+ optional int32 end_row = 5;
+}
+
+message Error{
+ optional int64 error_id = 1; // for debug tracing purposes
+ optional string host = 2;
+ optional int32 error_type = 3;
+ optional string message = 4;
+ optional TextErrorLocation error = 5; //optional, used when providing location of error within a piece of text.
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..ad18d6e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
@@ -0,0 +1,28 @@
+// This file tells Drill to consider this module when class path scanning.
+// This file can also include any supplementary configuration information.
+// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.exec: {
+ cluster-id: "drillbits1"
+ rpc: {
+ user.port : 31010,
+ bit.port : 31011
+ },
+ optimizer: {
+ implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
+ },
+
+ zk: {
+ connect: "localhost:2181",
+ root: "/drill",
+ refresh: 500,
+ timeout: 1000,
+ retry: {
+ count: 7200,
+ delay: 500
+ }
+ }
+
+ network: {
+ start: 35000
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/BBOutputStream.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/BBOutputStream.java b/sandbox/prototype/exec/java-exec/src/test/java/BBOutputStream.java
new file mode 100644
index 0000000..7f03dfa
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/BBOutputStream.java
@@ -0,0 +1,38 @@
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+public class BBOutputStream extends OutputStream{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BBOutputStream.class);
+
+ private ByteBuffer buf;
+
+ public BBOutputStream(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ buf.put((byte) b);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/CompressingBytesColumn.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/CompressingBytesColumn.java b/sandbox/prototype/exec/java-exec/src/test/java/CompressingBytesColumn.java
new file mode 100644
index 0000000..4fb67ed
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/CompressingBytesColumn.java
@@ -0,0 +1,46 @@
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+public class CompressingBytesColumn {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompressingBytesColumn.class);
+
+ int mb = 1024*1024;
+
+ ByteBuffer values = ByteBuffer.allocateDirect(20*mb);
+ ByteBuffer fromCompressBuffer = ByteBuffer.allocateDirect(mb);
+ ByteBuffer toCompressBuffer = ByteBuffer.allocateDirect(mb);
+
+
+ public CompressingBytesColumn(){
+ }
+
+ public void add(byte[] bytes, int start, int length){
+
+ }
+
+ public void add(ByteBuffer buffer){
+
+ }
+ public void write(OutputStream stream){
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/ExternalSort.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/ExternalSort.java b/sandbox/prototype/exec/java-exec/src/test/java/ExternalSort.java
new file mode 100644
index 0000000..c6233ae
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/ExternalSort.java
@@ -0,0 +1,21 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+public class ExternalSort {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSort.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/GenerateExternalSortData.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/GenerateExternalSortData.java b/sandbox/prototype/exec/java-exec/src/test/java/GenerateExternalSortData.java
new file mode 100644
index 0000000..dca7d27
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/GenerateExternalSortData.java
@@ -0,0 +1,124 @@
+import java.io.BufferedReader;
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+
+import org.apache.hadoop.conf.Configuration;
+import org.xerial.snappy.Snappy;
+
+import com.google.common.base.Charsets;
+import com.google.protobuf.CodedOutputStream;
+
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+public class GenerateExternalSortData {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GenerateExternalSortData.class);
+
+ /** Convert sequence file in to compressed columnar format.
+ *
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception{
+ int mb = 1024*1024;
+ final int blockSize = 1024;
+ ByteBuffer keys = ByteBuffer.allocateDirect(2*mb);
+ ByteBuffer values = ByteBuffer.allocateDirect(20*mb);
+ ByteBuffer fromCompressBuffer = ByteBuffer.allocateDirect(mb);
+ ByteBuffer toCompressBuffer = ByteBuffer.allocateDirect(mb);
+
+ ByteBuffer valueLengthB = ByteBuffer.allocateDirect(blockSize*4);
+ IntBuffer valueLengths = valueLengthB.asIntBuffer();
+ //Opaque value stored as len,data.
+
+ //
+ //Snappy.compress(uncompressed, compressed);
+ String file = "/opt/data/tera1gb/part-00000";
+ Configuration config = new Configuration();
+ //SequenceFile.Reader sf = new SequenceFile.Reader(FileSystem.getLocal(config), new Path(file), config);
+
+ BufferedReader reader = Files.newBufferedReader(FileSystems.getDefault().getPath(file), Charsets.UTF_8);
+
+ CodedOutputStream cos = CodedOutputStream.newInstance(new BBOutputStream(values));
+
+ long originalBytes = 0;
+ long compressedBytes = 0;
+ String l;
+ int round = 0;
+ long nanos = 0;
+ long x1 = System.nanoTime();
+ while((l = reader.readLine()) != null){
+
+ byte[] bytes = l.getBytes();
+ keys.put(bytes, 0, 10);
+ int len = bytes.length - 10;
+ originalBytes += len;
+
+
+ // Compress the value.
+ long n1 = System.nanoTime();
+ fromCompressBuffer.put(bytes, 10, len);
+ fromCompressBuffer.flip();
+ int newLen = Snappy.compress(fromCompressBuffer, toCompressBuffer);
+ cos.writeRawVarint32(newLen);
+ toCompressBuffer.flip();
+ values.put(toCompressBuffer);
+ fromCompressBuffer.clear();
+ toCompressBuffer.clear();
+ nanos += (System.nanoTime() - n1);
+
+ compressedBytes += newLen;
+ //valueLengths.put(newLen);
+
+ round++;
+
+ if(round >= blockSize){
+ // flush
+ keys.clear();
+ values.clear();
+ round = 0;
+
+ }
+
+
+ }
+
+ System.out.println("Uncompressed: " + originalBytes);
+ System.out.println("Compressed: " + compressedBytes);
+ System.out.println("CompressionTime: " + nanos/1000/1000);
+ System.out.println("Total Time: " + (System.nanoTime() - x1)/1000/1000);
+
+ }
+
+ private static void convertToDeltas(IntBuffer b){
+ b.flip();
+ int min = Integer.MAX_VALUE;
+ for(int i =0; i < b.limit(); i++){
+ min = Math.min(b.get(i), min);
+ }
+
+ for(int i =0; i < b.limit(); i++){
+ int cur = b.get(i);
+ b.put(i, cur - min);
+ }
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/column/SimpleExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/column/SimpleExec.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/column/SimpleExec.java
new file mode 100644
index 0000000..e3747e1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/column/SimpleExec.java
@@ -0,0 +1,30 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.column;
+
+import org.junit.Test;
+
+public class SimpleExec {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleExec.class);
+
+ @Test
+ public void columnarAnd() throws Exception{
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestOpenBitSet.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestOpenBitSet.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestOpenBitSet.java
new file mode 100644
index 0000000..66f69de
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestOpenBitSet.java
@@ -0,0 +1,361 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.util.BitSet;
+import java.util.Random;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public class TestOpenBitSet {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOpenBitSet.class);
+
+ Random random = new Random();
+ ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
+
+ public int atLeast(int val){
+ return val + random.nextInt(val);
+ }
+
+
+ public Random random() {
+ return new Random();
+ }
+
+ void doGet(BitSet a, BufBitSet b) {
+ int max = a.size();
+ for (int i = 0; i < max; i++) {
+ if (a.get(i) != b.get(i)) {
+ fail("mismatch: BitSet=[" + i + "]=" + a.get(i));
+ }
+ if (a.get(i) != b.get((long) i)) {
+ fail("mismatch: BitSet=[" + i + "]=" + a.get(i));
+ }
+ }
+ }
+
+ void doGetFast(BitSet a, BufBitSet b, int max) {
+ for (int i = 0; i < max; i++) {
+ if (a.get(i) != b.fastGet(i)) {
+ fail("mismatch: BitSet=[" + i + "]=" + a.get(i));
+ }
+ if (a.get(i) != b.fastGet((long) i)) {
+ fail("mismatch: BitSet=[" + i + "]=" + a.get(i));
+ }
+ }
+ }
+
+ void doNextSetBit(BitSet a, BufBitSet b) {
+ int aa = -1, bb = -1;
+ do {
+ aa = a.nextSetBit(aa + 1);
+ bb = b.nextSetBit(bb + 1);
+ assertEquals(aa, bb);
+ } while (aa >= 0);
+ }
+
+ void doNextSetBitLong(BitSet a, BufBitSet b) {
+ int aa = -1, bb = -1;
+ do {
+ aa = a.nextSetBit(aa + 1);
+ bb = (int) b.nextSetBit((long) (bb + 1));
+ assertEquals(aa, bb);
+ } while (aa >= 0);
+ }
+
+ void doPrevSetBit(BitSet a, BufBitSet b) {
+ int aa = a.size() + random().nextInt(100);
+ int bb = aa;
+ do {
+ // aa = a.prevSetBit(aa-1);
+ aa--;
+ while ((aa >= 0) && (!a.get(aa))) {
+ aa--;
+ }
+ bb = b.prevSetBit(bb - 1);
+ assertEquals(aa, bb);
+ } while (aa >= 0);
+ }
+
+ void doPrevSetBitLong(BitSet a, BufBitSet b) {
+ int aa = a.size() + random().nextInt(100);
+ int bb = aa;
+ do {
+ // aa = a.prevSetBit(aa-1);
+ aa--;
+ while ((aa >= 0) && (!a.get(aa))) {
+ aa--;
+ }
+ bb = (int) b.prevSetBit((long) (bb - 1));
+ assertEquals(aa, bb);
+ } while (aa >= 0);
+ }
+
+ // test interleaving different OpenBitSetIterator.next()/skipTo()
+ void doIterate(BitSet a, BufBitSet b, int mode) {
+ // if (mode == 1) doIterate1(a, b);
+ // if (mode == 2) doIterate2(a, b);
+ }
+
+ //
+ // void doIterate1(BitSet a, OpenBitSet b) {
+ // int aa = -1, bb = -1;
+ // OpenBitSetIterator iterator = new OpenBitSetIterator(b);
+ // do {
+ // aa = a.nextSetBit(aa + 1);
+ // bb = random().nextBoolean() ? iterator.nextDoc() : iterator.advance(bb + 1);
+ // assertEquals(aa == -1 ? DocIdSetIterator.NO_MORE_DOCS : aa, bb);
+ // } while (aa >= 0);
+ // }
+ //
+ // void doIterate2(BitSet a, OpenBitSet b) {
+ // int aa = -1, bb = -1;
+ // OpenBitSetIterator iterator = new OpenBitSetIterator(b);
+ // do {
+ // aa = a.nextSetBit(aa + 1);
+ // bb = random().nextBoolean() ? iterator.nextDoc() : iterator.advance(bb + 1);
+ // assertEquals(aa == -1 ? DocIdSetIterator.NO_MORE_DOCS : aa, bb);
+ // } while (aa >= 0);
+ // }
+
+ void doRandomSets(int maxSize, int iter, int mode) {
+ BitSet a0 = null;
+ BufBitSet b0 = null;
+
+ for (int i = 0; i < iter; i++) {
+ int sz = random().nextInt(maxSize);
+ BitSet a = new BitSet(sz);
+ BufBitSet b = new BufBitSet(sz, allocator);
+
+ // test the various ways of setting bits
+ if (sz > 0) {
+ int nOper = random().nextInt(sz);
+ for (int j = 0; j < nOper; j++) {
+ int idx;
+
+ idx = random().nextInt(sz);
+ a.set(idx);
+ b.fastSet(idx);
+
+ idx = random().nextInt(sz);
+ a.set(idx);
+ b.fastSet((long) idx);
+
+ idx = random().nextInt(sz);
+ a.clear(idx);
+ b.fastClear(idx);
+
+ idx = random().nextInt(sz);
+ a.clear(idx);
+ b.fastClear((long) idx);
+
+ idx = random().nextInt(sz);
+ a.flip(idx);
+ b.fastFlip(idx);
+
+ boolean val = b.flipAndGet(idx);
+ boolean val2 = b.flipAndGet(idx);
+ assertTrue(val != val2);
+
+ idx = random().nextInt(sz);
+ a.flip(idx);
+ b.fastFlip((long) idx);
+
+ val = b.flipAndGet((long) idx);
+ val2 = b.flipAndGet((long) idx);
+ assertTrue(val != val2);
+
+ val = b.getAndSet(idx);
+ assertTrue(val2 == val);
+ assertTrue(b.get(idx));
+
+ if (!val) b.fastClear(idx);
+ assertTrue(b.get(idx) == val);
+ }
+ }
+
+ // test that the various ways of accessing the bits are equivalent
+ doGet(a, b);
+ doGetFast(a, b, sz);
+
+ // test ranges, including possible extension
+ int fromIndex, toIndex;
+ fromIndex = random().nextInt(sz + 80);
+ toIndex = fromIndex + random().nextInt((sz >> 1) + 1);
+ BitSet aa = (BitSet) a.clone();
+ aa.flip(fromIndex, toIndex);
+ BufBitSet bb = b.cloneTest();
+ bb.flip(fromIndex, toIndex);
+
+ doIterate(aa, bb, mode); // a problem here is from flip or doIterate
+
+ fromIndex = random().nextInt(sz + 80);
+ toIndex = fromIndex + random().nextInt((sz >> 1) + 1);
+ aa = (BitSet) a.clone();
+ aa.clear(fromIndex, toIndex);
+ bb = b.cloneTest();
+ bb.clear(fromIndex, toIndex);
+
+ doNextSetBit(aa, bb); // a problem here is from clear() or nextSetBit
+ doNextSetBitLong(aa, bb);
+
+ doPrevSetBit(aa, bb);
+ doPrevSetBitLong(aa, bb);
+
+ fromIndex = random().nextInt(sz + 80);
+ toIndex = fromIndex + random().nextInt((sz >> 1) + 1);
+ aa = (BitSet) a.clone();
+ aa.set(fromIndex, toIndex);
+ bb = b.cloneTest();
+ bb.set(fromIndex, toIndex);
+
+ doNextSetBit(aa, bb); // a problem here is from set() or nextSetBit
+ doNextSetBitLong(aa, bb);
+
+ doPrevSetBit(aa, bb);
+ doPrevSetBitLong(aa, bb);
+
+ if (a0 != null) {
+ assertEquals(a.equals(a0), b.equals(b0));
+
+ assertEquals(a.cardinality(), b.cardinality());
+
+ BitSet a_and = (BitSet) a.clone();
+ a_and.and(a0);
+ BitSet a_or = (BitSet) a.clone();
+ a_or.or(a0);
+ BitSet a_xor = (BitSet) a.clone();
+ a_xor.xor(a0);
+ BitSet a_andn = (BitSet) a.clone();
+ a_andn.andNot(a0);
+
+ BufBitSet b_and = b.cloneTest();
+ assertEquals(b, b_and);
+ b_and.and(b0);
+ BufBitSet b_or = b.cloneTest();
+ b_or.or(b0);
+ BufBitSet b_xor = b.cloneTest();
+ b_xor.xor(b0);
+ BufBitSet b_andn = b.cloneTest();
+ b_andn.andNot(b0);
+
+ doIterate(a_and, b_and, mode);
+ doIterate(a_or, b_or, mode);
+ doIterate(a_xor, b_xor, mode);
+ doIterate(a_andn, b_andn, mode);
+
+ assertEquals(a_and.cardinality(), b_and.cardinality());
+ assertEquals(a_or.cardinality(), b_or.cardinality());
+ assertEquals(a_xor.cardinality(), b_xor.cardinality());
+ assertEquals(a_andn.cardinality(), b_andn.cardinality());
+
+ // test non-mutating popcounts
+ assertEquals(b_and.cardinality(), BufBitSet.intersectionCount(b, b0));
+ assertEquals(b_or.cardinality(), BufBitSet.unionCount(b, b0));
+ assertEquals(b_xor.cardinality(), BufBitSet.xorCount(b, b0));
+ assertEquals(b_andn.cardinality(), BufBitSet.andNotCount(b, b0));
+ }
+
+ a0 = a;
+ b0 = b;
+ }
+ }
+
+ // large enough to flush obvious bugs, small enough to run in <.5 sec as part of a
+ // larger testsuite.
+ @Test
+ public void testSmall() {
+ doRandomSets(atLeast(1200), atLeast(1000), 1);
+ doRandomSets(atLeast(1200), atLeast(1000), 2);
+ }
+
+ // uncomment to run a bigger test (~2 minutes).
+ /*
+ * public void testBig() { doRandomSets(2000,200000, 1); doRandomSets(2000,200000, 2); }
+ */
+
+ @Test
+ public void testEquals() {
+ BufBitSet b1 = new BufBitSet(1111, allocator);
+ BufBitSet b2 = new BufBitSet(2222, allocator);
+ assertTrue(b1.equals(b2));
+ assertTrue(b2.equals(b1));
+ b1.set(10);
+ assertFalse(b1.equals(b2));
+ assertFalse(b2.equals(b1));
+ b2.set(10);
+ assertTrue(b1.equals(b2));
+ assertTrue(b2.equals(b1));
+ b2.set(2221);
+ assertFalse(b1.equals(b2));
+ assertFalse(b2.equals(b1));
+ b1.set(2221);
+ assertTrue(b1.equals(b2));
+ assertTrue(b2.equals(b1));
+
+ // try different type of object
+ assertFalse(b1.equals(new Object()));
+ }
+
+ @Test
+ public void testHashCodeEquals() {
+ BufBitSet bs1 = new BufBitSet(200, allocator);
+ BufBitSet bs2 = new BufBitSet(64, allocator);
+ bs1.set(3);
+ bs2.set(3);
+ assertEquals(bs1, bs2);
+ assertEquals(bs1.hashCode(), bs2.hashCode());
+ }
+
+ private BufBitSet makeOpenBitSet(int[] a) {
+ BufBitSet bs = new BufBitSet(64, allocator);
+ for (int e : a) {
+ bs.set(e);
+ }
+ return bs;
+ }
+
+ private BitSet makeBitSet(int[] a) {
+ BitSet bs = new BitSet();
+ for (int e : a) {
+ bs.set(e);
+ }
+ return bs;
+ }
+
+ private void checkPrevSetBitArray(int[] a) {
+ BufBitSet obs = makeOpenBitSet(a);
+ BitSet bs = makeBitSet(a);
+ doPrevSetBit(bs, obs);
+ }
+
+ public void testPrevSetBit() {
+ checkPrevSetBitArray(new int[] {});
+ checkPrevSetBitArray(new int[] { 0 });
+ checkPrevSetBitArray(new int[] { 0, 2 });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java
new file mode 100644
index 0000000..c8ce877
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java
@@ -0,0 +1,107 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.exec.proto.UserProtos.QueryHandle;
+import org.apache.drill.exec.proto.UserProtos.QueryResultsMode;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.junit.Test;
+
+public class UserRpcTest {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcTest.class);
+
+
+
+
+ @Test
+ public void doBasicRpcTest() throws Exception {
+ final int bufferSize = 25000;
+ final int batchSize = 1000;
+ final int batchCount = 100;
+
+
+ int sends = 0;
+ int receives = 0;
+ long nanoSend = 0;
+ long nanoReceive = 0;
+
+
+ try {
+ ByteBufAllocator bb = new PooledByteBufAllocator(true);
+// ByteBufAllocator bb = UnpooledByteBufAllocator.DEFAULT;
+ UserServer s = new UserServer(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Server-")), null);
+ s.bind(31515);
+
+ logger.debug("Starting user client.");
+ UserClient c = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
+
+ logger.debug("Connecting as client to server.");
+ c.connectAsClient("localhost", 31515);
+
+
+ @SuppressWarnings("unchecked")
+ DrillRpcFuture<QueryHandle>[] handles = new DrillRpcFuture[batchSize];
+
+ for (int x = 0; x < batchCount; x++) {
+ long s1 = System.nanoTime();
+ for (int i = 0; i < batchSize; i++) {
+ sends++;
+ ByteBuf rawBody = bb.buffer(bufferSize);
+ rawBody.writerIndex(bufferSize);
+ if(rawBody.readableBytes() != bufferSize) throw new RuntimeException();
+ handles[i] = c.submitQuery(RunQuery.newBuilder().setMode(QueryResultsMode.QUERY_FOR_STATUS).build(), rawBody);
+ }
+
+ long s2 = System.nanoTime();
+
+ for (int i = 0; i < batchSize; i++) {
+ handles[i].checkedGet(2, TimeUnit.SECONDS).getQueryId();
+ receives++;
+ }
+
+ long s3 = System.nanoTime();
+ nanoSend += (s2-s1);
+ nanoReceive += (s3-s2);
+ logger.debug("Submission time {}ms, return time {}ms", (s2 - s1) / 1000 / 1000, (s3 - s2) / 1000 / 1000);
+ }
+ // logger.debug("Submitting query.");
+ // DrillRpcFuture<QueryHandle> handleFuture =
+ // c.submitQuery(RunQuery.newBuilder().setMode(QueryResultsMode.QUERY_FOR_STATUS).build());
+ //
+ // logger.debug("Got query id handle of {}", handleFuture.get(2, TimeUnit.SECONDS).getQueryId());
+ } catch (Exception e) {
+ logger.error("Exception of type {} occurred while doing test.", e.getClass().getCanonicalName());
+ throw e;
+ } finally{
+ long mbsTransferred = (1l * bufferSize * batchSize * batchCount)/1024/1024;
+ double sSend = nanoSend*1.0d/1000/1000/1000;
+ double sReceive = nanoReceive*1.0d/1000/1000/1000;
+ logger.info(String.format("Completed %d sends and %d receives. Total data transferred was %d. Send bw: %f, Receive bw: %f.", sends, receives, mbsTransferred, mbsTransferred*1.0/sSend, mbsTransferred*1.0/sReceive));
+ logger.info("Completed {} sends and {} receives.", sends, receives);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/StartDrillbit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/StartDrillbit.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/StartDrillbit.java
new file mode 100644
index 0000000..7b353df
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/StartDrillbit.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.junit.Test;
+
+public class StartDrillbit {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StartDrillbit.class);
+
+
+ @Test public void startDrillbit() throws DrillbitStartupException, InterruptedException{
+ Drillbit.main(new String[0]);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml b/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
new file mode 100644
index 0000000..b79b811
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<configuration>
+
+ <appender name="SOCKET" class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
+ <Compressing>true</Compressing>
+ <ReconnectionDelay>10000</ReconnectionDelay>
+ <IncludeCallerData>true</IncludeCallerData>
+ <RemoteHosts>localhost</RemoteHosts>
+ </appender>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+<!--
+ <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>/logs/test-common.log</file>
+ <encoder>
+ <pattern>%date %level [%thread] %logger{10} [%file:%line] %msg%n</pattern>
+ </encoder>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>/logs/test-common.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <maxHistory>30</maxHistory>
+ </rollingPolicy>
+ </appender>
+ -->
+ <logger name="org.apache.drill" additivity="false">
+ <level value="debug" />
+ <appender-ref ref="SOCKET" />
+ <appender-ref ref="STDOUT" />
+<!-- <appender-ref ref="FILE" /> -->
+ </logger>
+
+ <root>
+ <level value="error" />
+ <appender-ref ref="SOCKET" />
+ <appender-ref ref="STDOUT" />
+<!-- <appender-ref ref="FILE" /> -->
+ </root>
+
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/ref/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/pom.xml b/sandbox/prototype/exec/ref/pom.xml
index b253f6b..045a36b 100644
--- a/sandbox/prototype/exec/ref/pom.xml
+++ b/sandbox/prototype/exec/ref/pom.xml
@@ -8,9 +8,9 @@
<groupId>org.apache.drill.exec</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
-
+
<artifactId>ref</artifactId>
-
+
<name>Logical Plan Execution Reference Implementation</name>
<dependencies>
@@ -36,8 +36,9 @@
</exclusion>
</exclusions>
</dependency>
-
+
+
<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>hppc</artifactId>