You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:29 UTC
[08/53] [abbrv] Update typing system. Update RPC system. Add
Fragmenting Implementation. Working single node. Distributed failing due to
threading issues.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
new file mode 100644
index 0000000..3f710ed
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.fragment;
+
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+import org.apache.drill.exec.work.FragmentRunner;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+
+public class LocalFragmentHandler implements IncomingFragmentHandler{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalFragmentHandler.class);
+
+ private final IncomingBuffers buffers;
+ private final FragmentRunner runner;
+ private final FragmentHandle handle;
+ private volatile boolean cancel = false;
+
+ public LocalFragmentHandler(FragmentHandle handle, IncomingBuffers buffers, FragmentRunner runner) {
+ super();
+ this.handle = handle;
+ this.buffers = buffers;
+ this.runner = runner;
+ }
+
+ @Override
+ public boolean handle(ConnectionThrottle throttle, RawFragmentBatch batch) throws FragmentSetupException {
+ return buffers.batchArrived(throttle, batch);
+ }
+
+ @Override
+ public FragmentRunner getRunnable() {
+ return runner;
+ }
+
+
+ public FragmentHandle getHandle() {
+ return handle;
+ }
+
+ @Override
+ public void cancel() {
+ cancel = true;
+ }
+
+ @Override
+ public boolean isDone() {
+ return cancel || isDone();
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
new file mode 100644
index 0000000..70d7e93
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
@@ -0,0 +1,123 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.fragment;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentLeaf;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+import org.apache.drill.exec.rpc.bit.BitTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.FragmentRunner;
+import org.apache.drill.exec.work.FragmentRunnerListener;
+import org.apache.drill.exec.work.RemotingFragmentRunnerListener;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+
+/**
+ * This handler receives all incoming traffic for a particular FragmentHandle. It will monitor the state of incoming batches
+ */
+public class RemoteFragmentHandler implements IncomingFragmentHandler {
+ private final PlanFragment fragment;
+ private FragmentLeaf root;
+ private final IncomingBuffers buffers;
+ private final FragmentRunnerListener runnerListener;
+ private volatile FragmentRunner runner;
+ private volatile boolean cancel = false;
+ private final FragmentContext context;
+ private final PhysicalPlanReader reader;
+
+ public RemoteFragmentHandler(PlanFragment fragment, DrillbitContext context, BitTunnel foremanTunnel) throws FragmentSetupException{
+ try{
+ this.fragment = fragment;
+ this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+ this.buffers = new IncomingBuffers(root);
+ this.context = new FragmentContext(context, fragment.getHandle(), null, buffers);
+ this.runnerListener = new RemotingFragmentRunnerListener(this.context, foremanTunnel);
+ this.reader = context.getPlanReader();
+
+ }catch(IOException e){
+ throw new FragmentSetupException("Failure while decoding fragment.", e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle, org.apache.drill.exec.record.RawFragmentBatch)
+ */
+ @Override
+ public boolean handle(ConnectionThrottle throttle, RawFragmentBatch batch) throws FragmentSetupException {
+ return buffers.batchArrived(throttle, batch);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.drill.exec.work.fragment.FragmentHandler#getRunnable()
+ */
+ @Override
+ public FragmentRunner getRunnable(){
+ synchronized(this){
+ if(runner != null) throw new IllegalStateException("Get Runnable can only be run once.");
+ if(cancel) return null;
+ try {
+ FragmentRoot fragRoot = reader.readFragmentOperator(fragment.getFragmentJson());
+ RootExec exec = ImplCreator.getExec(context, fragRoot);
+ runner = new FragmentRunner(context, exec, runnerListener);
+ return this.runner;
+ } catch (IOException | ExecutionSetupException e) {
+ runnerListener.fail(fragment.getHandle(), "Failure while setting up remote fragment.", e);
+ return null;
+ }
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.drill.exec.work.fragment.FragmentHandler#cancel()
+ */
+ @Override
+ public void cancel(){
+ synchronized(this){
+ cancel = true;
+ if(runner != null){
+ runner.cancel();
+ }
+ }
+ }
+
+ @Override
+ public FragmentHandle getHandle() {
+ return fragment.getHandle();
+ }
+
+ @Override
+ public boolean isDone() {
+ return cancel || buffers.isDone();
+ }
+
+
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
new file mode 100644
index 0000000..621c7cb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.user;
+
+import java.util.UUID;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserProtos.RequestResults;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.work.FragmentRunner;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.foreman.Foreman;
+
+public class UserWorker{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserWorker.class);
+
+ private final WorkerBee bee;
+
+ public UserWorker(WorkerBee bee) {
+ super();
+ this.bee = bee;
+ }
+
+ public QueryId submitWork(UserClientConnection connection, RunQuery query){
+ UUID uuid = UUID.randomUUID();
+ QueryId id = QueryId.newBuilder().setPart1(uuid.getMostSignificantBits()).setPart2(uuid.getLeastSignificantBits()).build();
+ Foreman foreman = new Foreman(bee, bee.getContext(), connection, id, query);
+ bee.addNewForeman(foreman);
+ return id;
+ }
+
+ public QueryResult getResult(UserClientConnection connection, RequestResults req){
+ Foreman foreman = bee.getForemanForQueryId(req.getQueryId());
+ if(foreman == null) return QueryResult.newBuilder().setQueryState(QueryState.UNKNOWN_QUERY).build();
+ return foreman.getResult(connection, req);
+ }
+
+ public Ack cancelQuery(QueryId query){
+ Foreman foreman = bee.getForemanForQueryId(query);
+ if(foreman != null){
+ foreman.cancel();
+ }
+ return Acks.OK;
+ }
+
+ public Ack cancelFragment(FragmentHandle handle){
+ FragmentRunner runner = bee.getFragmentRunner(handle);
+ if(runner != null) runner.cancel();
+ return Acks.OK;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto
new file mode 100644
index 0000000..5cc5cab
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto
@@ -0,0 +1,26 @@
+package exec;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "CoordinationProtos";
+option optimize_for = SPEED;
+
+message DrillbitEndpoint{
+ optional string address = 1;
+ optional int32 user_port = 2;
+ optional int32 bit_port = 3;
+ optional Roles roles = 4;
+}
+
+message DrillServiceInstance{
+ optional string id = 1;
+ optional int64 registrationTimeUTC = 2;
+ optional DrillbitEndpoint endpoint = 3;
+}
+
+message Roles{
+ optional bool sql_query = 1 [default = true];
+ optional bool logical_plan = 2 [default = true];
+ optional bool physical_plan = 3 [default = true];
+ optional bool java_executor = 4 [default = true];
+ optional bool distributed_cache = 5 [default = true];
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
index 77a7ee1..7501d7c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
@@ -2,9 +2,11 @@ package exec.bit;
option java_package = "org.apache.drill.exec.proto";
option java_outer_classname = "ExecProtos";
-option optimize_for = LITE_RUNTIME;
-import "SchemaDef.proto";
+option optimize_for = SPEED;
+
import "Coordination.proto";
+import "UserBitShared.proto";
+
////// UserToBit RPC ///////
@@ -16,7 +18,7 @@ enum RpcType {
// bit requests
REQ_INIATILIZE_FRAGMENT = 3; // Returns Handle
REQ_RECORD_BATCH = 4; // send record batch overview, returns Ack
- REQ_BATCH_CHUNK = 5; // send additional batch chunk, returns Ack.
+
REQ_CANCEL_FRAGMENT = 6; // send a cancellation message for a fragment, returns Ack
REQ_FRAGMENT_STATUS = 7; // get a fragment status, returns FragmentStatus
REQ_BIT_STATUS = 8; // get bit status.
@@ -25,34 +27,29 @@ enum RpcType {
RESP_FRAGMENT_HANDLE = 9;
RESP_FRAGMENT_STATUS = 10;
RESP_BIT_STATUS = 11;
- RESP_BATCH_CHUNK = 12;
}
message BitHandshake{
- optional DrillbitEndpoint endpoint = 1;
+ optional int32 rpc_version = 1;
+ optional DrillbitEndpoint endpoint = 2;
}
message BitBatchChunk {}
message BitStatus {
- repeated ActiveFragment fragment = 1;
-}
-
-message ActiveFragment {
- optional FragmentStatus status = 1;
- optional int64 fragment_id = 2;
- optional int64 query_id = 3;
+ repeated FragmentStatus fragment_status = 1;
}
message FragmentStatus {
enum FragmentState {
- AWAITING_ALLOCATION = 0;
- RUNNING = 1;
- FINISHED = 2;
- CANCELLED = 3;
- FAILED = 4;
+ SENDING = 0;
+ AWAITING_ALLOCATION = 1;
+ RUNNING = 2;
+ FINISHED = 3;
+ CANCELLED = 4;
+ FAILED = 5;
}
optional int64 memory_use = 1;
@@ -61,27 +58,37 @@ message FragmentStatus {
optional int32 estimated_completion_percentage = 4;
optional FragmentState state = 5;
optional int64 data_processed = 6;
+
+ optional FragmentHandle handle = 7;
+ optional exec.shared.DrillPBError error = 8;
+ optional int64 running_time = 9;
}
-message RecordBatchHeader {
+message FragmentRecordBatch{
+ optional FragmentHandle handle = 1;
+ optional int32 sending_major_fragment_id = 2;
+ optional int32 sending_minor_fragment_id = 3;
+ optional exec.shared.RecordBatchDef def = 4;
+ optional bool isLastBatch = 5;
}
message PlanFragment {
- optional int64 query_id = 1;
- optional int32 major_fragment_id = 2;
- optional int32 minor_fragment_id = 3;
+ optional FragmentHandle handle = 1;
optional float network_cost = 4;
optional float cpu_cost = 5;
optional float disk_cost = 6;
optional float memory_cost = 7;
optional string fragment_json = 8;
- optional bool self_driven = 9;
optional DrillbitEndpoint assignment = 10;
+ optional bool leaf_fragment = 9;
+ optional DrillbitEndpoint foreman = 11;
+
}
message FragmentHandle {
- optional int32 major_fragment_id = 1;
- optional int32 minor_fragment_id = 1;
+ optional exec.shared.QueryId query_id = 1;
+ optional int32 major_fragment_id = 2;
+ optional int32 minor_fragment_id = 3;
}
message WorkQueueStatus{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
index ebc7dca..48011bf 100644
--- a/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
@@ -2,7 +2,9 @@ package exec.rpc;
option java_package = "org.apache.drill.exec.proto";
option java_outer_classname = "GeneralRPCProtos";
-option optimize_for = LITE_RUNTIME;
+option optimize_for = SPEED;
+
+import "Coordination.proto";
message Ack{
optional bool ok = 1;
@@ -33,3 +35,5 @@ message RpcFailure {
optional string short_error = 3;
optional string long_error = 4;
}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
index 6e983d4..de0009a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
@@ -2,29 +2,62 @@ package exec;
option java_package = "org.apache.drill.exec.proto";
option java_outer_classname = "SchemaDefProtos";
-option optimize_for = LITE_RUNTIME;
+option optimize_for = SPEED;
// Schema Definitions //
-enum DataType {
- LATE = 0;
- INT32 = 1;
- INT64 = 2;
- FLOAT32 = 3;
- FLOAT64 = 4;
- UTF8 = 5;
- BYTES = 6;
+enum MinorType {
+ LATE = 0; // late binding type
+ MAP = 1; // an empty map column. Useful for conceptual setup. Children listed within here
+ REPEATMAP = 2; // a repeated map column (means that multiple children sit below this)
+ TINYINT = 3; // single byte signed integer
+ SMALLINT = 4; // two byte signed integer
+ INT = 5; // four byte signed integer
+ BIGINT = 6; // eight byte signed integer
+ DECIMAL4 = 7; // a decimal supporting precision between 1 and 8 (4 bits for decimal location, 1 sign)
+ DECIMAL8 = 8; // a decimal supporting precision between 9 and 18 (5 bits for decimal location, 1 sign)
+ DECIMAL12 = 9; // a decimal supporting precision between19 and 28 (5 bits for decimal location, 1 sign)
+ DECIMAL16 = 10; // a decimal supporting precision between 29 and 37 (6 bits for decimal location, 1 sign)
+ MONEY = 11; // signed decimal with two digit precision
+ DATE = 12; // days since 4713bc
+ TIME = 13; // time in micros before or after 2000/1/1
+ TIMETZ = 14; // time in micros before or after 2000/1/1 with timezone
+ TIMESTAMP = 15; // unix epoch time in millis
+ DATETIME = 16; // TBD
+ INTERVAL = 17; // TBD
+ FLOAT4 = 18; // 4 byte ieee 754
+ FLOAT8 = 19; // 8 byte ieee 754
+ BOOLEAN = 20; // single bit value
+ FIXEDCHAR = 21; // utf8 fixed length string, padded with spaces
+ VARCHAR1 = 22; // utf8 variable length string (up to 2^8 in length)
+ VARCHAR2 = 23; // utf8 variable length string (up to 2^16 in length)
+ VARCHAR4 = 24; // utf8 variable length string (up to 2^32 in length)
+ FIXEDBINARY = 25; // fixed length binary, padded with 0 bytes
+ VARBINARY1 = 26; // variable length binary (up to 2^8 in length)
+ VARBINARY2 = 27; // variable length binary (up to 2^16 in length)
+ VARBINARY4 = 28; // variable length binary (up to 2^32 in length)
+ UINT1 = 29; // unsigned 1 byte integer
+ UINT2 = 30; // unsigned 2 byte integer
+ UINT4 = 31; // unsigned 4 byte integer
+ UINT8 = 32; // unsigned 8 byte integer
+ PROTO2 = 33; // protobuf encoded complex type. (up to 2^16 in length)
+ PROTO4 = 34; // protobuf encoded complex type. (up to 2^32 in length)
+ MSGPACK2 = 35; // msgpack encoded complex type. (up to 2^16 in length)
+ MSGPACK4 = 36; // msgpack encoded complex type. (up to 2^32 in length)
}
-enum DataMode {
- REQUIRED = 0;
- OPTIONAL = 1;
- REPEATED = 2;
- MAP = 3;
+message MajorType {
+ optional MinorType minor_type = 1;
+ optional DataMode mode = 2;
+ optional int32 width = 3; // optional width for fixed size values.
+ optional int32 precision = 4; // used for decimal types
+ optional int32 scale = 5; // used for decimal types
}
-message SchemaDef {
- repeated FieldDef field = 1;
+enum DataMode {
+ OPTIONAL = 0; // nullable
+ REQUIRED = 1; // non-nullable
+ REPEATED = 2; // single, repeated-field
}
enum ValueMode {
@@ -33,12 +66,21 @@ enum ValueMode {
DICT = 2;
}
+message NamePart {
+ enum Type{
+ NAME = 0;
+ ARRAY = 1;
+ }
+
+ optional Type type = 1;
+ optional string name = 2; // only required if this is a named type.
+}
+
message FieldDef {
- optional string name = 1;
- optional DataMode data_mode = 2;
- optional ValueMode value_mode = 3;
+ optional int32 field_id = 1;
+ optional int32 parent_id = 2; // the field_id of the parent of this field. populated when this is a repeated field. a field_id of 0 means that the record is the parent of this repeated field.
+ repeated NamePart name = 3; // multipart description of entire field name
+ optional MajorType major_type = 4; // the type associated with this field.
+ repeated FieldDef field = 5; // only in the cases of type == MAP or REPEATMAP
- // If DataMode == 0-2, type should be populated and fields should be empty. Otherwise, type should empty and fields should be defined.
- optional DataType type = 4;
- repeated FieldDef fields = 5;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
index 225d1a0..cbf5b4c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
@@ -2,8 +2,12 @@ package exec.user;
option java_package = "org.apache.drill.exec.proto";
option java_outer_classname = "UserProtos";
-option optimize_for = LITE_RUNTIME;
+option optimize_for = SPEED;
+
import "SchemaDef.proto";
+import "UserBitShared.proto";
+
+
////// UserToBit RPC ///////
enum RpcType {
@@ -13,7 +17,8 @@ enum RpcType {
// user to bit
RUN_QUERY = 3;
- REQUEST_RESULTS = 4;
+ CANCEL_QUERY = 4;
+ REQUEST_RESULTS = 5;
// bit to user
QUERY_RESULT = 6;
@@ -21,33 +26,36 @@ enum RpcType {
}
message UserToBitHandshake {
- optional bool support_listening = 1;
- optional int32 rpc_version = 2;
+ optional bool support_listening = 2;
+ optional int32 rpc_version = 3;
}
message RequestResults {
- optional int64 query_id = 1;
+ optional exec.shared.QueryId query_id = 1;
optional int32 maximum_responses = 2;
}
message RunQuery {
- optional QueryResultsMode mode = 1;
- optional string plan = 2;
+ optional QueryResultsMode results_mode = 1;
+ optional QueryType type = 2;
+ optional string plan = 3;
+}
+
+enum QueryType {
+ SQL = 1;
+ LOGICAL = 2;
+ PHYSICAL = 3;
}
enum QueryResultsMode {
STREAM_FULL = 1; // Server will inform the client regularly on the status of the query. Once the query is completed, service will inform the client as each query chunk is made available.
- STREAM_FIRST = 2; // Server will inform the client regularly on the status of the query. Once the query is completed, server will inform the client of the first query chunk.
- QUERY_FOR_STATUS = 3; // Client will need to query for status of query.
+ // STREAM_FIRST = 2; // Server will inform the client regularly on the status of the query. Once the query is completed, server will inform the client of the first query chunk.
+ // QUERY_FOR_STATUS = 3; // Client will need to query for status of query.
}
message BitToUserHandshake {
- optional int32 rpc_version = 1;
-}
-
-message QueryHandle {
- optional int64 query_id = 1;
+ optional int32 rpc_version = 2;
}
message NodeStatus {
@@ -56,37 +64,26 @@ message NodeStatus {
}
message QueryResult {
- enum Outcome {
- RUNNING = 0;
- FAILED = 1;
+ enum QueryState {
+ PENDING = 0;
+ RUNNING = 1;
COMPLETED = 2;
- WAITING = 3;
+ CANCELED = 3;
+ FAILED = 4;
+ UNKNOWN_QUERY = 5;
}
- optional Outcome outcome = 1;
- optional SchemaDef schema = 2;
+ optional QueryState query_state = 1;
+ optional exec.shared.QueryId query_id = 2;
optional bool is_last_chunk = 3;
optional int32 row_count = 4;
optional int64 records_scan = 5;
optional int64 records_error = 6;
optional int64 submission_time = 7;
repeated NodeStatus node_status = 8;
- repeated Error error = 9;
-}
-
-message TextErrorLocation{
- optional int32 start_column = 2;
- optional int32 start_row = 3;
- optional int32 end_column = 4;
- optional int32 end_row = 5;
-}
-
-message Error{
- optional int64 error_id = 1; // for debug tracing purposes
- optional string host = 2;
- optional int32 error_type = 3;
- optional string message = 4;
- optional TextErrorLocation error = 5; //optional, used when providing location of error within a piece of text.
+ repeated exec.shared.DrillPBError error = 9;
+ optional exec.shared.RecordBatchDef def = 10;
+ optional bool schema_changed = 11;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/UserBitShared.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/UserBitShared.proto
new file mode 100644
index 0000000..5643c0f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/UserBitShared.proto
@@ -0,0 +1,46 @@
+package exec.shared;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "UserBitShared";
+option optimize_for = SPEED;
+
+import "Coordination.proto";
+import "SchemaDef.proto";
+
+message QueryId {
+ optional sfixed64 part1 = 1;
+ optional sfixed64 part2 = 2;
+}
+
+message DrillPBError{
+ optional string error_id = 1; // for debug tracing purposes
+ optional DrillbitEndpoint endpoint = 2;
+ optional int32 error_type = 3;
+ optional string message = 4;
+ repeated ParsingError parsing_error = 5; //optional, used when providing location of error within a piece of text.
+}
+
+message ParsingError{
+ optional int32 start_column = 2;
+ optional int32 start_row = 3;
+ optional int32 end_column = 4;
+ optional int32 end_row = 5;
+}
+
+message RecordBatch{
+
+}
+
+message RecordBatchDef {
+ repeated FieldMetadata field = 1;
+ optional int32 record_count = 2;
+
+}
+
+message FieldMetadata {
+ optional FieldDef def = 1;
+ optional int32 value_count = 2;
+ optional int32 var_byte_length = 3;
+ optional int32 group_count = 4; // number of groups. (number of repeated records)
+ optional int32 buffer_length = 5;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
index 37ba12b..d113ca3 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
@@ -103,4 +103,11 @@ public class DrillSystemTestBase {
}
}
+ public Drillbit getABit(){
+ return this.servers.iterator().next();
+ }
+
+ public static DrillConfig getConfig(){
+ return config;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
index 09a06d7..dc463e3 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
@@ -1,17 +1,20 @@
package org.apache.drill.exec.client;
-import com.google.common.base.Charsets;
-import com.google.common.io.Resources;
+import java.util.List;
+
import org.apache.drill.exec.DrillSystemTestBase;
-import org.apache.drill.exec.proto.UserProtos;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.proto.UserProtos.QueryType;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.junit.After;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
-/**
- * @author David Alves
- */
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+
+
+@Ignore
public class DrillClientSystemTest extends DrillSystemTestBase {
private static String plan;
@@ -34,8 +37,8 @@ public class DrillClientSystemTest extends DrillSystemTestBase {
startCluster(1);
DrillClient client = new DrillClient();
client.connect();
- DrillRpcFuture<UserProtos.QueryHandle> result = client.submitPlan(plan);
- System.out.println(result.get());
+ List<QueryResultBatch> result = client.runQuery(QueryType.LOGICAL, plan);
+ System.out.println(result);
client.close();
}
@@ -45,8 +48,8 @@ public class DrillClientSystemTest extends DrillSystemTestBase {
startCluster(2);
DrillClient client = new DrillClient();
client.connect();
- DrillRpcFuture<UserProtos.QueryHandle> result = client.submitPlan(plan);
- System.out.println(result.get());
+ List<QueryResultBatch> result = client.runQuery(QueryType.LOGICAL, plan);
+ System.out.println(result);
client.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassCompilationTypes.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassCompilationTypes.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassCompilationTypes.java
index 3d5d84e..2f8aa18 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassCompilationTypes.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassCompilationTypes.java
@@ -18,14 +18,15 @@
package org.apache.drill.exec.compile;
import org.codehaus.commons.compiler.jdk.ExpressionEvaluator;
+import org.junit.Ignore;
import org.junit.Test;
public class TestClassCompilationTypes {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestClassCompilationTypes.class);
- @Test
+ @Ignore @Test
public void comparePerfs() throws Exception {
- for(int i =0; i < 50000; i++){
+ for(int i =0; i < 500; i++){
int r = 0;
long n0 = System.nanoTime();
r += janino();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/config/ParsePhysicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/config/ParsePhysicalPlan.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/config/ParsePhysicalPlan.java
new file mode 100644
index 0000000..3b6bf6a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/config/ParsePhysicalPlan.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class ParsePhysicalPlan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParsePhysicalPlan.class);
+
+
+ @Test
+ public void parseSimplePlan() throws Exception{
+ DrillConfig c = DrillConfig.create();
+ ObjectReader r = c.getMapper().reader(PhysicalPlan.class);
+ ObjectWriter writer = c.getMapper().writer();
+ PhysicalPlan plan = PhysicalPlan.parse(r, Files.toString(FileUtils.getResourceAsFile("/physical_test1.json"), Charsets.UTF_8));
+ System.out.println(plan.unparse(writer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
new file mode 100644
index 0000000..7c6bfe5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserProtos.QueryType;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+@Ignore
+public class DistributedFragmentRun extends PopUnitTestBase{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedFragmentRun.class);
+
+
+ @Test
+ public void simpleDistributedQuery() throws Exception{
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); Drillbit bit2 = new Drillbit(CONFIG, serviceSet); DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){
+ bit1.run();
+ bit2.run();
+ client.connect();
+ List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_single_exchange.json"), Charsets.UTF_8));
+ System.out.println(results);
+ }
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleFragmentRun.java
new file mode 100644
index 0000000..6755bb6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleFragmentRun.java
@@ -0,0 +1,100 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserProtos.QueryType;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.vector.ValueVector;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.junit.Test;
+
+import com.carrotsearch.hppc.cursors.IntObjectCursor;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class SimpleFragmentRun extends PopUnitTestBase {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleFragmentRun.class);
+
+ @Test
+ public void runNoExchangeFragment() throws Exception {
+ try(RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ Drillbit bit = new Drillbit(CONFIG, serviceSet);
+ DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){
+
+ // run query.
+ bit.run();
+ client.connect();
+ List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_test2.json"), Charsets.UTF_8));
+
+ // look at records
+ RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
+ int recordCount = 0;
+ for (QueryResultBatch batch : results) {
+ if(!batch.hasData()) continue;
+ boolean schemaChanged = batchLoader.load(batch.getHeader().getDef(), batch.getData());
+ boolean firstColumn = true;
+
+ // print headers.
+ if (schemaChanged) {
+ System.out.println("\n\n========NEW SCHEMA=========\n\n");
+ for (IntObjectCursor<ValueVector<?>> v : batchLoader) {
+
+ if (firstColumn) {
+ firstColumn = false;
+ } else {
+ System.out.print("\t");
+ }
+ System.out.print(v.value.getField().getName());
+ System.out.print("[");
+ System.out.print(v.value.getField().getType().getMinorType());
+ System.out.print("]");
+ }
+ System.out.println();
+ }
+
+
+ for (int i = 0; i < batchLoader.getRecordCount(); i++) {
+ boolean first = true;
+ recordCount++;
+ for (IntObjectCursor<ValueVector<?>> v : batchLoader) {
+ if (first) {
+ first = false;
+ } else {
+ System.out.print("\t");
+ }
+ System.out.print(v.value.getObject(i));
+ }
+ if(!first) System.out.println();
+ }
+
+ }
+ logger.debug("Received results {}", results);
+ assertEquals(recordCount, 200);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
index 98bb874..7b7ab8e 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
@@ -24,63 +24,63 @@ import static org.junit.Assert.assertNull;
import java.io.IOException;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.physical.PhysicalPlan;
-import org.apache.drill.common.physical.pop.base.PhysicalOperator;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.planner.FragmentNode;
-import org.apache.drill.exec.planner.FragmentingPhysicalVisitor;
-import org.apache.drill.exec.planner.FragmentNode.ExchangeFragmentPair;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.PlanningSet;
+import org.apache.drill.exec.planner.fragment.StatsCollector;
+import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
+import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.SimpleExecPlanner;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.QueryWorkUnit;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
-public class CheckFragmenter {
+public class CheckFragmenter extends PopUnitTestBase {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CheckFragmenter.class);
-
- static DrillConfig config;
-
- @BeforeClass
- public static void setup(){
- config = DrillConfig.create();
- }
-
+
+
@Test
- public void ensureOneFragment() throws FragmentSetupException, IOException{
- FragmentNode b = getRootFragment("/physical_test1.json");
+ public void ensureOneFragment() throws FragmentSetupException, IOException {
+ PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
+ Fragment b = getRootFragment(ppr, "/physical_test1.json");
assertEquals(1, getFragmentCount(b));
assertEquals(0, b.getReceivingExchangePairs().size());
assertNull(b.getSendingExchange());
}
-
+
@Test
- public void ensureTwoFragments() throws FragmentSetupException, IOException{
- FragmentNode b = getRootFragment("/physical_simpleexchange.json");
- assertEquals(2, getFragmentCount(b));
+ public void ensureThreeFragments() throws FragmentSetupException, IOException {
+ PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
+ Fragment b = getRootFragment(ppr, "/physical_simpleexchange.json");
+ logger.debug("Fragment Node {}", b);
+ assertEquals(3, getFragmentCount(b));
assertEquals(1, b.getReceivingExchangePairs().size());
assertNull(b.getSendingExchange());
-
+
// get first child.
b = b.iterator().next().getNode();
+ assertEquals(1, b.getReceivingExchangePairs().size());
+ assertNotNull(b.getSendingExchange());
+
+ b = b.iterator().next().getNode();
assertEquals(0, b.getReceivingExchangePairs().size());
assertNotNull(b.getSendingExchange());
}
+
- private int getFragmentCount(FragmentNode b){
- int i =1;
- for(ExchangeFragmentPair p : b){
- i += getFragmentCount(p.getNode());
- }
- return i;
- }
+
+
- private FragmentNode getRootFragment(String file) throws FragmentSetupException, IOException{
- FragmentingPhysicalVisitor f = new FragmentingPhysicalVisitor();
-
- PhysicalPlan plan = PhysicalPlan.parse(config.getMapper().reader(PhysicalPlan.class), Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
- PhysicalOperator o = plan.getSortedOperators(false).iterator().next();
- return o.accept(f, null);
- }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckInjectionValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckInjectionValue.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckInjectionValue.java
index b8fd278..1d0fb91 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckInjectionValue.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckInjectionValue.java
@@ -22,12 +22,12 @@ import static org.junit.Assert.*;
import java.util.List;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.physical.PhysicalPlan;
-import org.apache.drill.common.physical.pop.Screen;
-import org.apache.drill.common.physical.pop.base.PhysicalOperator;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StorageEngineRegistry;
import org.junit.BeforeClass;
@@ -48,8 +48,8 @@ public class CheckInjectionValue {
@Test
public void testInjected() throws Exception{
- PhysicalPlanReader r = new PhysicalPlanReader(config.getMapper(), DrillbitEndpoint.getDefaultInstance());
- PhysicalPlan p = r.read(Files.toString(FileUtils.getResourceAsFile("/physical_screen.json"), Charsets.UTF_8));
+ PhysicalPlanReader r = new PhysicalPlanReader(config, config.getMapper(), DrillbitEndpoint.getDefaultInstance());
+ PhysicalPlan p = r.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/physical_screen.json"), Charsets.UTF_8));
List<PhysicalOperator> o = p.getSortedOperators(false);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
new file mode 100644
index 0000000..6f229a3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.pop;
+
+import static org.junit.Assert.*;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.PlanningSet;
+import org.apache.drill.exec.planner.fragment.StatsCollector;
+import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class FragmentChecker extends PopUnitTestBase{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentChecker.class);
+
+
+ @Test
+ public void checkSimpleExchangePlan() throws Exception{
+
+ PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
+ Fragment fragmentRoot = getRootFragment(ppr, "/physical_simpleexchange.json");
+ PlanningSet planningSet = StatsCollector.collectStats(fragmentRoot);
+ SimpleParallelizer par = new SimpleParallelizer();
+
+ DrillbitEndpoint b1 = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(1234).build();
+ DrillbitEndpoint b2 = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(2345).build();
+
+ QueryWorkUnit qwu = par.getFragments(b1, QueryId.getDefaultInstance(), Lists.newArrayList(b1, b2), ppr, fragmentRoot, planningSet, 10);
+ assertEquals(qwu.getFragments().size(), 3);
+ System.out.println("=========ROOT FRAGMENT=========");
+ System.out.print(qwu.getRootFragment().getFragmentJson());
+
+
+ for(PlanFragment f : qwu.getFragments()){
+ System.out.println("=========");
+ System.out.print(f.getFragmentJson());
+ }
+ logger.debug("Planning Set {}", planningSet);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
new file mode 100644
index 0000000..e5e109e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
@@ -0,0 +1,71 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.pop;
+
+import java.io.IOException;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
+import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public abstract class PopUnitTestBase {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PopUnitTestBase.class);
+
+ // Determine if we are in Eclipse Debug mode.
+ static final boolean IS_DEBUG = java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0;
+ protected static DrillConfig CONFIG;
+
+ // Set a timeout unless we're debugging.
+ @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(10000);
+
+ @BeforeClass
+ public static void setup() {
+ CONFIG = DrillConfig.create();
+ }
+
+
+ public static int getFragmentCount(Fragment b) {
+ int i = 1;
+ for (ExchangeFragmentPair p : b) {
+ i += getFragmentCount(p.getNode());
+ }
+ return i;
+ }
+
+ public Fragment getRootFragment(PhysicalPlanReader reader, String file) throws FragmentSetupException, IOException {
+ MakeFragmentsVisitor f = new MakeFragmentsVisitor();
+
+ PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
+ PhysicalOperator o = plan.getSortedOperators(false).iterator().next();
+ return o.accept(f, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/RunRemoteQuery.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/RunRemoteQuery.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/RunRemoteQuery.java
deleted file mode 100644
index d003373..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/RunRemoteQuery.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.rpc.user;
-
-import io.netty.buffer.UnpooledByteBufAllocator;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-import org.apache.drill.exec.proto.UserProtos.QueryHandle;
-import org.apache.drill.exec.proto.UserProtos.RunQuery;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.junit.Test;
-
-public class RunRemoteQuery {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RunRemoteQuery.class);
-
- @Test
- public void runRemoteQuery() throws Exception{
- UserClient c = new UserClient(UnpooledByteBufAllocator.DEFAULT, new NioEventLoopGroup(1));
- c.connectAsClient("localhost", 31010);
- DrillRpcFuture<QueryHandle> futureHandle = c.submitQuery(RunQuery.getDefaultInstance());
- QueryHandle h = futureHandle.checkedGet();
- System.out.println(h);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java
deleted file mode 100644
index c8ce877..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.rpc.user;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.exec.proto.UserProtos.QueryHandle;
-import org.apache.drill.exec.proto.UserProtos.QueryResultsMode;
-import org.apache.drill.exec.proto.UserProtos.RunQuery;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
-import org.junit.Test;
-
-public class UserRpcTest {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcTest.class);
-
-
-
-
- @Test
- public void doBasicRpcTest() throws Exception {
- final int bufferSize = 25000;
- final int batchSize = 1000;
- final int batchCount = 100;
-
-
- int sends = 0;
- int receives = 0;
- long nanoSend = 0;
- long nanoReceive = 0;
-
-
- try {
- ByteBufAllocator bb = new PooledByteBufAllocator(true);
-// ByteBufAllocator bb = UnpooledByteBufAllocator.DEFAULT;
- UserServer s = new UserServer(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Server-")), null);
- s.bind(31515);
-
- logger.debug("Starting user client.");
- UserClient c = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
-
- logger.debug("Connecting as client to server.");
- c.connectAsClient("localhost", 31515);
-
-
- @SuppressWarnings("unchecked")
- DrillRpcFuture<QueryHandle>[] handles = new DrillRpcFuture[batchSize];
-
- for (int x = 0; x < batchCount; x++) {
- long s1 = System.nanoTime();
- for (int i = 0; i < batchSize; i++) {
- sends++;
- ByteBuf rawBody = bb.buffer(bufferSize);
- rawBody.writerIndex(bufferSize);
- if(rawBody.readableBytes() != bufferSize) throw new RuntimeException();
- handles[i] = c.submitQuery(RunQuery.newBuilder().setMode(QueryResultsMode.QUERY_FOR_STATUS).build(), rawBody);
- }
-
- long s2 = System.nanoTime();
-
- for (int i = 0; i < batchSize; i++) {
- handles[i].checkedGet(2, TimeUnit.SECONDS).getQueryId();
- receives++;
- }
-
- long s3 = System.nanoTime();
- nanoSend += (s2-s1);
- nanoReceive += (s3-s2);
- logger.debug("Submission time {}ms, return time {}ms", (s2 - s1) / 1000 / 1000, (s3 - s2) / 1000 / 1000);
- }
- // logger.debug("Submitting query.");
- // DrillRpcFuture<QueryHandle> handleFuture =
- // c.submitQuery(RunQuery.newBuilder().setMode(QueryResultsMode.QUERY_FOR_STATUS).build());
- //
- // logger.debug("Got query id handle of {}", handleFuture.get(2, TimeUnit.SECONDS).getQueryId());
- } catch (Exception e) {
- logger.error("Exception of type {} occurred while doing test.", e.getClass().getCanonicalName());
- throw e;
- } finally{
- long mbsTransferred = (1l * bufferSize * batchSize * batchCount)/1024/1024;
- double sSend = nanoSend*1.0d/1000/1000/1000;
- double sReceive = nanoReceive*1.0d/1000/1000/1000;
- logger.info(String.format("Completed %d sends and %d receives. Total data transferred was %d. Send bw: %f, Receive bw: %f.", sends, receives, mbsTransferred, mbsTransferred*1.0/sSend, mbsTransferred*1.0/sReceive));
- logger.info("Completed {} sends and {} receives.", sends, receives);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
new file mode 100644
index 0000000..9684e9f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
@@ -0,0 +1,84 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitClient;
+import org.apache.drill.exec.rpc.bit.BitComImpl;
+import org.apache.drill.exec.rpc.bit.BitConnection;
+import org.apache.drill.exec.rpc.bit.BitRpcConfig;
+import org.apache.drill.exec.rpc.bit.BitServer;
+import org.apache.drill.exec.rpc.bit.ListenerPool;
+import org.apache.drill.exec.work.batch.BitComHandler;
+import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestBitRpc {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitRpc.class);
+
+ @Test
+ public void testBasicConnectionAndHandshake() throws Exception{
+ int port = 1234;
+ BootStrapContext c = new BootStrapContext(DrillConfig.create());
+ ConcurrentMap<DrillbitEndpoint, BitConnection> registry = Maps.newConcurrentMap();
+ BitServer server = new BitServer(new BitComTestHandler(), c, registry, new ListenerPool(2));
+ port = server.bind(port);
+ for(int i =0; i < 10; i++){
+ BitClient client = new BitClient(DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(port).build(), null, new BitComTestHandler(), c, registry, new ListenerPool(2));
+ client.connect();
+
+ }
+ System.out.println("connected");
+ }
+
+
+
+ private class BitComTestHandler implements BitComHandler{
+
+ @Override
+ public Response handle(BitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ return BitRpcConfig.OK;
+ }
+
+ @Override
+ public void startNewRemoteFragment(PlanFragment fragment) {
+ }
+
+ @Override
+ public Ack cancelFragment(FragmentHandle handle) {
+ return null;
+ }
+
+ @Override
+ public void registerIncomingFragmentHandler(IncomingFragmentHandler handler) {
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordConfig.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordConfig.java
deleted file mode 100644
index 18c6955..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordConfig.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.store;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.logical.StorageEngineConfigBase;
-
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("mock-config")
-public class MockRecordConfig extends StorageEngineConfigBase{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordConfig.class);
-
- private int recordCount;
- private DataType[] types;
-
- public int getRecordCount() {
- return recordCount;
- }
- public void setRecordCount(int recordCount) {
- this.recordCount = recordCount;
- }
- public DataType[] getTypes() {
- return types;
- }
- public void setTypes(DataType[] types) {
- this.types = types;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordReader.java
deleted file mode 100644
index e1f56bd..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordReader.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.store;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.exec.exception.ExecutionSetupException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OutputMutator;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.vector.Int16Vector;
-import org.apache.drill.exec.record.vector.Int32Vector;
-import org.apache.drill.exec.record.vector.ValueVector;
-
-public class MockRecordReader implements RecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
-
- private BatchSchema expectedSchema;
- private OutputMutator output;
- private MockRecordConfig config;
- private FragmentContext context;
- private ValueVector<?>[] valueVectors;
- private int recordsRead;
-
- public MockRecordReader(FragmentContext context, MockRecordConfig config) {
- this.config = config;
- }
-
- private int getEstimatedRecordSize(DataType[] types) {
- int x = 0;
- for (int i = 0; i < types.length; i++) {
- x += getEstimatedColumnSize(i);
- }
- return x;
- }
-
- private int getEstimatedColumnSize(int fieldId) {
- return 4;
- }
-
- private ValueVector<?> getVector(int fieldId, DataType dt, int length) {
- ValueVector<?> v;
- if (dt == DataType.INT16) {
- v = new Int16Vector(fieldId, context.getAllocator());
- } else if (dt == DataType.INT32) {
- v = new Int32Vector(fieldId, context.getAllocator());
- } else {
- throw new UnsupportedOperationException();
- }
- v.allocateNew(length);
- return v;
-
- }
-
- @Override
- public void setup(BatchSchema expectedSchema, OutputMutator output) throws ExecutionSetupException {
- try {
- this.expectedSchema = expectedSchema;
- this.output = output;
- int estimateRowSize = getEstimatedRecordSize(config.getTypes());
- valueVectors = new ValueVector<?>[config.getTypes().length];
- int batchRecordCount = 250000 / estimateRowSize;
-
- for (int i = 0; i < config.getTypes().length; i++) {
- valueVectors[i] = getVector(i, config.getTypes()[i], batchRecordCount);
- output.addField(i, valueVectors[i]);
- }
- } catch (SchemaChangeException e) {
- throw new ExecutionSetupException("Failure while setting up fields", e);
- }
-
- }
-
- @Override
- public int next() {
- int recordSetSize = Math.min(valueVectors[0].size(), this.config.getRecordCount()- recordsRead);
- recordsRead += recordSetSize;
- return recordSetSize;
- }
-
- @Override
- public void cleanup() {
- for (int i = 0; i < valueVectors.length; i++) {
- try {
- output.removeField(valueVectors[i].getField().getFieldId());
- } catch (SchemaChangeException e) {
- logger.warn("Failure while trying tremove field.", e);
- }
- valueVectors[i].close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockStorageEngine.java
deleted file mode 100644
index cc82540..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockStorageEngine.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.store;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.ops.FragmentContext;
-
-import com.google.common.collect.ListMultimap;
-
-public class MockStorageEngine extends AbstractStorageEngine{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
-
- @Override
- public boolean supportsRead() {
- return true;
- }
-
- @Override
- public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
- return null;
- }
-
- @Override
- public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
- return null;
- }
-
- @Override
- public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException {
- return null;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
index 8785736..771a2fd 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
@@ -7,6 +7,9 @@ drill.exec: {
user.port : 31010,
bit.port : 32010
},
+ operator: {
+ packages += "org.apache.drill.exec.physical.config"
+ },
optimizer: {
implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
},
@@ -30,4 +33,4 @@ drill.exec: {
network: {
start: 35000
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml b/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
index b79b811..3af6e10 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
@@ -1,16 +1,14 @@
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
-
+<!--
<appender name="SOCKET" class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
<Compressing>true</Compressing>
<ReconnectionDelay>10000</ReconnectionDelay>
<IncludeCallerData>true</IncludeCallerData>
<RemoteHosts>localhost</RemoteHosts>
</appender>
-
+-->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <!-- encoders are assigned the type
- ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
@@ -30,14 +28,14 @@
-->
<logger name="org.apache.drill" additivity="false">
<level value="debug" />
- <appender-ref ref="SOCKET" />
+<!-- <appender-ref ref="SOCKET" /> -->
<appender-ref ref="STDOUT" />
<!-- <appender-ref ref="FILE" /> -->
</logger>
<root>
<level value="error" />
- <appender-ref ref="SOCKET" />
+<!-- <appender-ref ref="SOCKET" /> -->
<appender-ref ref="STDOUT" />
<!-- <appender-ref ref="FILE" /> -->
</root>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/physical_screen.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_screen.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_screen.json
index c63aef1..8bb97db 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/physical_screen.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_screen.json
@@ -12,9 +12,8 @@
pop:"mock-scan",
url: "http://apache.org",
entries:[
- {id:1}
- ],
- cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
+ {id:1, records: 100, size: 100}
+ ]
},
{
@id:2,
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/physical_simpleexchange.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_simpleexchange.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_simpleexchange.json
index e332785..85823cf 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/physical_simpleexchange.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_simpleexchange.json
@@ -12,30 +12,44 @@
pop:"mock-scan",
url: "http://apache.org",
entries:[
- {id:1}
- ],
- cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
+ {records: 100, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "BIGINT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]},
+ {records: 100, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "BIGINT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
},
{
@id:2,
- pop: "partition-to-random-exchange",
- child: 1,
- partition: {
- mode: "DUPLICATE"
- }
+ pop: "hash-to-random-exchange",
+ child: 1
},
{
@id:3,
child: 2,
pop:"filter",
expr: "b > 5",
- cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
+ selectivity: 0.8
},
{
@id: 4,
child: 3,
- pop: "mock-store",
- cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
+ pop: "mock-store"
+ },
+ {
+ @id:5,
+ child: 4,
+ pop: "union-exchange"
+ },
+ {
+ @id: 6,
+ child: 5,
+ pop: "screen"
}
]
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
new file mode 100644
index 0000000..675ecfb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
@@ -0,0 +1,34 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-scan",
+ url: "http://apache.org",
+ entries:[
+ {records: 100, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "BIGINT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+
+ ]
+ },
+ {
+ @id:2,
+ child: 1,
+ pop: "union-exchange"
+ },
+ {
+ @id: 3,
+ child: 2,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/physical_test1.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_test1.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_test1.json
new file mode 100644
index 0000000..0ddd48f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_test1.json
@@ -0,0 +1,40 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-scan",
+ url: "http://apache.org",
+ entries:[
+ {records: 100, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "BIGINT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id:2,
+ child: 1,
+ pop:"filter",
+ expr: "b > 5",
+ selectivity: 0.80
+ },
+ {
+ @id: 3,
+ child: 2,
+ pop: "mock-store"
+ },
+ {
+ @id: 4,
+ child: 3,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/physical_test2.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_test2.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_test2.json
new file mode 100644
index 0000000..b001921
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_test2.json
@@ -0,0 +1,34 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-scan",
+ url: "http://apache.org",
+ entries:[
+ {records: 100, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "BIGINT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]},
+ {records: 100, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "BIGINT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id: 2,
+ child: 1,
+ pop: "screen"
+ }
+ ]
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/ref/src/test/resources/donuts.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/donuts.json b/sandbox/prototype/exec/ref/src/test/resources/donuts.json
index 2d98b59..9fd6e3e 100644
--- a/sandbox/prototype/exec/ref/src/test/resources/donuts.json
+++ b/sandbox/prototype/exec/ref/src/test/resources/donuts.json
@@ -20,7 +20,7 @@
[
{ "id": "5001", "type": "None" },
{ "id": "5002", "type": "Glazed" },
- { "id": "5005", "type": "Sugar" },
+ { "id": "5005", "type": "Sugar", color: "White"},
{ "id": "5007", "type": "Powdered Sugar" },
{ "id": "5006", "type": "Chocolate with Sprinkles" },
{ "id": "5003", "type": "Chocolate" },