You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/04/14 04:35:13 UTC
[9/9] git commit: basic framework for physical plan. abstraction of
graph classes.
basic framework for physical plan. abstraction of graph classes.
Initial Work on Java Exec
WIP commit of java-exec
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b53933f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b53933f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b53933f2
Branch: refs/heads/execwork
Commit: b53933f225e21b890a9cc25545be7ce4223ba0ce
Parents: 2a6e1b3
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat Mar 16 17:56:35 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat Apr 13 03:01:00 2013 -0700
----------------------------------------------------------------------
.../apache/drill/common/config/DrillConfig.java | 51 +-
.../exceptions/DrillConfigurationException.java | 43 +
.../drill/common/exceptions/DrillIOException.java | 42 +
.../drill/common/expression/types/DataType.java | 2 +
.../apache/drill/common/optimize/Optimizer.java | 45 +
.../drill/common/physical/DataValidationMode.java | 24 +
.../org/apache/drill/common/physical/FieldSet.java | 23 +-
.../apache/drill/common/physical/RecordField.java | 43 +-
.../apache/drill/common/physical/ParsePlan.java | 36 +
.../common/src/test/resources/basic_physical.json | 42 +
.../common/src/test/resources/dsort-physical.json | 76 --
.../common/src/test/resources/dsort_logical.json | 40 +
.../common/src/test/resources/dsort_physical.json | 72 ++
.../common/src/test/resources/simple_plan.json | 133 +++
sandbox/prototype/exec/java-exec/pom.xml | 165 +++-
.../prototype/exec/java-exec/rse/ClasspathRSE.java | 88 ++
.../prototype/exec/java-exec/rse/ConsoleRSE.java | 60 +
.../exec/java-exec/rse/FileSystemRSE.java | 144 +++
.../exec/java-exec/rse/JSONDataWriter.java | 142 +++
.../exec/java-exec/rse/JSONRecordReader.java | 183 ++++
.../exec/java-exec/rse/OutputStreamWriter.java | 78 ++
sandbox/prototype/exec/java-exec/rse/QueueRSE.java | 100 ++
sandbox/prototype/exec/java-exec/rse/RSEBase.java | 71 ++
.../prototype/exec/java-exec/rse/RSERegistry.java | 85 ++
.../prototype/exec/java-exec/rse/RecordReader.java | 28 +
.../exec/java-exec/rse/RecordRecorder.java | 32 +
.../exec/java-exec/rse/ReferenceStorageEngine.java | 45 +
.../org/apache/drill/exec/BufferAllocator.java | 52 +
.../apache/drill/exec/DirectBufferAllocator.java | 47 +
.../java/org/apache/drill/exec/ExecConstants.java | 31 +
.../apache/drill/exec/cache/DistributedCache.java | 38 +
.../org/apache/drill/exec/cache/HazelCache.java | 133 +++
.../drill/exec/cache/TemplatizedLogicalPlan.java | 22 +
.../drill/exec/cache/TemplatizedPhysicalPlan.java | 22 +
.../drill/exec/coord/ClusterCoordinator.java | 47 +
.../exec/coord/DrillServiceInstanceHelper.java | 57 +
.../drill/exec/coord/ZKClusterCoordinator.java | 145 +++
.../drill/exec/coord/ZKRegistrationHandle.java | 32 +
.../java/org/apache/drill/exec/disk/Spool.java | 29 +
.../exec/exception/DrillbitStartupException.java | 46 +
.../exec/exception/ExecutionSetupException.java | 45 +
.../exec/exception/SchemaChangeException.java | 52 +
.../drill/exec/exception/SetupException.java | 46 +
.../org/apache/drill/exec/ops/BatchIterator.java | 32 +
.../org/apache/drill/exec/ops/FragmentContext.java | 49 +
.../org/apache/drill/exec/ops/OutputMutator.java | 28 +
.../org/apache/drill/exec/ops/QueryOutcome.java | 22 +
.../java/org/apache/drill/exec/ops/ScanBatch.java | 157 +++
.../exec/ops/exchange/PartitioningSender.java | 23 +
.../drill/exec/ops/exchange/RandomReceiver.java | 24 +
.../drill/exec/ops/exchange/RecordBatchSender.java | 24 +
.../apache/drill/exec/opt/IdentityOptimizer.java | 40 +
.../org/apache/drill/exec/planner/ExecPlanner.java | 27 +
.../org/apache/drill/exec/record/BatchSchema.java | 123 +++
.../java/org/apache/drill/exec/record/DeadBuf.java | 848 +++++++++++++++
.../drill/exec/record/InvalidValueAccessor.java | 46 +
.../drill/exec/record/MaterializedField.java | 79 ++
.../org/apache/drill/exec/record/RecordBatch.java | 85 ++
.../org/apache/drill/exec/record/RecordMaker.java | 22 +
.../record/vector/AbstractFixedValueVector.java | 60 +
.../apache/drill/exec/record/vector/AnyVector.java | 30 +
.../drill/exec/record/vector/BaseValueVector.java | 104 ++
.../apache/drill/exec/record/vector/BitUtil.java | 108 ++
.../apache/drill/exec/record/vector/BitVector.java | 118 ++
.../apache/drill/exec/record/vector/BufBitSet.java | 847 ++++++++++++++
.../drill/exec/record/vector/ByteVector.java | 48 +
.../drill/exec/record/vector/Int32Vector.java | 52 +
.../exec/record/vector/NullableValueVector.java | 70 ++
.../drill/exec/record/vector/ValueVector.java | 81 ++
.../drill/exec/record/vector/VariableVector.java | 78 ++
.../org/apache/drill/exec/rpc/BasicClient.java | 81 ++
.../org/apache/drill/exec/rpc/BasicServer.java | 106 ++
.../drill/exec/rpc/ChannelClosedException.java | 39 +
.../apache/drill/exec/rpc/CoordinationQueue.java | 87 ++
.../org/apache/drill/exec/rpc/DrillRpcFuture.java | 92 ++
.../apache/drill/exec/rpc/InboundRpcMessage.java | 50 +
.../apache/drill/exec/rpc/NamedThreadFactory.java | 48 +
.../apache/drill/exec/rpc/OutboundRpcMessage.java | 50 +
.../drill/exec/rpc/PositiveAtomicInteger.java | 39 +
.../apache/drill/exec/rpc/RemoteRpcException.java | 38 +
.../java/org/apache/drill/exec/rpc/Response.java | 41 +
.../java/org/apache/drill/exec/rpc/RpcBus.java | 172 +++
.../org/apache/drill/exec/rpc/RpcConstants.java | 26 +
.../java/org/apache/drill/exec/rpc/RpcDecoder.java | 142 +++
.../java/org/apache/drill/exec/rpc/RpcEncoder.java | 127 +++
.../org/apache/drill/exec/rpc/RpcException.java | 45 +
.../apache/drill/exec/rpc/RpcExceptionHandler.java | 52 +
.../java/org/apache/drill/exec/rpc/RpcMessage.java | 45 +
.../exec/rpc/ZeroCopyProtobufLengthDecoder.java | 80 ++
.../org/apache/drill/exec/rpc/bit/BitClient.java | 62 ++
.../java/org/apache/drill/exec/rpc/bit/BitCom.java | 69 ++
.../apache/drill/exec/rpc/bit/BitComHandler.java | 136 +++
.../org/apache/drill/exec/rpc/bit/BitComImpl.java | 142 +++
.../org/apache/drill/exec/rpc/bit/BitServer.java | 64 ++
.../org/apache/drill/exec/rpc/bit/BitTunnel.java | 63 ++
.../org/apache/drill/exec/rpc/user/UserClient.java | 72 ++
.../org/apache/drill/exec/rpc/user/UserServer.java | 90 ++
.../org/apache/drill/exec/schema/BackedRecord.java | 44 +
.../org/apache/drill/exec/schema/DataRecord.java | 56 +
.../org/apache/drill/exec/schema/DiffSchema.java | 66 ++
.../java/org/apache/drill/exec/schema/Field.java | 135 +++
.../org/apache/drill/exec/schema/IdGenerator.java | 13 +
.../org/apache/drill/exec/schema/ListSchema.java | 108 ++
.../org/apache/drill/exec/schema/NamedField.java | 44 +
.../org/apache/drill/exec/schema/ObjectSchema.java | 91 ++
.../org/apache/drill/exec/schema/OrderedField.java | 33 +
.../java/org/apache/drill/exec/schema/Record.java | 29 +
.../org/apache/drill/exec/schema/RecordSchema.java | 29 +
.../drill/exec/schema/SchemaIdGenerator.java | 36 +
.../apache/drill/exec/schema/SchemaRecorder.java | 122 +++
.../exec/schema/json/jackson/JacksonHelper.java | 63 ++
.../exec/schema/json/jackson/PhysicalOperator.java | 36 +
.../json/jackson/PhysicalOperatorIterator.java | 31 +
.../drill/exec/schema/json/jackson/ScanJson.java | 203 ++++
.../transform/ProtobufSchemaTransformer.java | 109 ++
.../exec/schema/transform/SchemaTransformer.java | 30 +
.../org/apache/drill/exec/server/Drillbit.java | 116 ++
.../apache/drill/exec/server/DrillbitContext.java | 65 ++
.../apache/drill/exec/server/StartupOptions.java | 66 ++
.../apache/drill/exec/service/ServiceEngine.java | 73 ++
.../drill/exec/store/QueryOptimizerRule.java | 21 +
.../org/apache/drill/exec/store/RecordReader.java | 49 +
.../apache/drill/exec/store/RecordRecorder.java | 36 +
.../org/apache/drill/exec/store/StorageEngine.java | 92 ++
.../drill/exec/store/StorageEngineRegistry.java | 82 ++
.../java-exec/src/main/protobuf/Coordination.proto | 32 +
.../src/main/protobuf/ExecutionProtos.proto | 65 ++
.../java-exec/src/main/protobuf/GeneralRPC.proto | 35 +
.../java-exec/src/main/protobuf/SchemaDef.proto | 37 +
.../exec/java-exec/src/main/protobuf/User.proto | 93 ++
.../java-exec/src/main/resources/drill-module.conf | 28 +
.../java-exec/src/test/java/BBOutputStream.java | 38 +
.../src/test/java/CompressingBytesColumn.java | 46 +
.../exec/java-exec/src/test/java/ExternalSort.java | 21 +
.../src/test/java/GenerateExternalSortData.java | 124 +++
.../drill/exec/record/column/SimpleExec.java | 30 +
.../drill/exec/record/vector/TestOpenBitSet.java | 361 ++++++
.../apache/drill/exec/rpc/user/UserRpcTest.java | 107 ++
.../apache/drill/exec/server/StartDrillbit.java | 31 +
.../exec/java-exec/src/test/resources/logback.xml | 45 +
sandbox/prototype/exec/ref/pom.xml | 7 +-
141 files changed, 10854 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index a775867..b738002 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.drill.common.exceptions.DrillConfigurationException;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.logical.StorageEngineConfigBase;
import org.apache.drill.common.logical.data.LogicalOperatorBase;
@@ -59,6 +60,14 @@ public final class DrillConfig extends NestedConfig{
};
/**
+ * Create a DrillConfig object using the default config file name
+ * @return The new DrillConfig object.
+ */
+ public static DrillConfig create() {
+ return create(null);
+ }
+
+ /**
* <p>
* DrillConfig loads up Drill configuration information. It does this utilizing a combination of classpath scanning
* and Configuration fallbacks provided by the TypeSafe configuration library. The order of precedence is as
@@ -68,16 +77,20 @@ public final class DrillConfig extends NestedConfig{
* Configuration values are retrieved as follows:
* <ul>
* <li>Check a single copy of "drill-override.conf". If multiple copies are on the classpath, behavior is
- * indeterminate.</li>
+ * indeterminate. If a non-null value for overrideFileName is provided, this is utilized instead of drill-override.conf.</li>
* <li>Check all copies of "drill-module.conf". Loading order is indeterminate.</li>
* <li>Check a single copy of "drill-default.conf". If multiple copies are on the classpath, behavior is
* indeterminate.</li>
* </ul>
*
* </p>
- * * @return A merged Config object.
+ * @param overrideFileName The name of the file to use for override purposes.
+ * @return A merged Config object.
*/
- public static DrillConfig create() {
+ public static DrillConfig create(String overrideFileName) {
+
+ overrideFileName = overrideFileName == null ? CommonConstants.CONFIG_OVERRIDE : overrideFileName;
+
// first we load defaults.
Config fallback = ConfigFactory.load(CommonConstants.CONFIG_DEFAULT);
Collection<URL> urls = PathScanner.getConfigURLs();
@@ -86,10 +99,40 @@ public final class DrillConfig extends NestedConfig{
fallback = ConfigFactory.parseURL(url).withFallback(fallback);
}
- Config c = ConfigFactory.load(CommonConstants.CONFIG_OVERRIDE).withFallback(fallback).resolve();
+ Config c = ConfigFactory.load(overrideFileName).withFallback(fallback).resolve();
return new DrillConfig(c);
}
+ public <T> Class<T> getClassAt(String location, Class<T> clazz) throws DrillConfigurationException{
+ String className = this.getString(location);
+ if(className == null) throw new DrillConfigurationException(String.format("No class defined at location '%s'. Expected a definition of the class []", location, clazz.getCanonicalName()));
+ try{
+ Class<?> c = Class.forName(className);
+ if(clazz.isAssignableFrom(c)){
+ @SuppressWarnings("unchecked") Class<T> t = (Class<T>) c;
+ return t;
+ }else{
+ throw new DrillConfigurationException(String.format("The class [%s] listed at location '%s' should be of type [%s]. It isn't.", className, location, clazz.getCanonicalName()));
+ }
+ }catch(Exception ex){
+ if(ex instanceof DrillConfigurationException) throw (DrillConfigurationException) ex;
+ throw new DrillConfigurationException(String.format("Failure while initializing class [%s] described at configuration value '%s'.", className, location), ex);
+ }
+
+ }
+
+ public <T> T getInstanceOf(String location, Class<T> clazz) throws DrillConfigurationException{
+ Class<T> c = getClassAt(location, clazz);
+ try{
+ T t = c.newInstance();
+ return t;
+ }catch(Exception ex){
+ throw new DrillConfigurationException(String.format("Failure while instantiating class [%s] located at '%s.", clazz.getCanonicalName(), location), ex);
+ }
+ }
+
+
+
public void setSinkQueues(int number, Queue<Object> queue){
sinkQueues.set(number, queue);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillConfigurationException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillConfigurationException.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillConfigurationException.java
new file mode 100644
index 0000000..aa83758
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillConfigurationException.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.exceptions;
+
+public class DrillConfigurationException extends DrillException {
+ public DrillConfigurationException() {
+ super();
+ }
+
+ public DrillConfigurationException(String message, Throwable cause, boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+ public DrillConfigurationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DrillConfigurationException(String message) {
+ super(message);
+ }
+
+ public DrillConfigurationException(Throwable cause) {
+ super(cause);
+ }
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfigurationException.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillIOException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillIOException.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillIOException.java
new file mode 100644
index 0000000..cd7d4ab
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/exceptions/DrillIOException.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.exceptions;
+
+import java.io.IOException;
+
+public class DrillIOException extends IOException{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillIOException.class);
+
+ public DrillIOException() {
+ super();
+ }
+
+ public DrillIOException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DrillIOException(String message) {
+ super(message);
+ }
+
+ public DrillIOException(Throwable cause) {
+ super(cause);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
index 776a9e8..60d26dc 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
@@ -54,6 +54,8 @@ public abstract class DataType {
public static final DataType LATEBIND = new LateBindType();
public static final DataType BOOLEAN = new AtomType("BOOLEAN", Comparability.EQUAL, false);
public static final DataType BYTES = new AtomType("BYTES", Comparability.ORDERED, false);
+ public static final DataType SIGNED_BYTE = new AtomType("SIGNED_BYTE", Comparability.ORDERED, true);
+ public static final DataType SIGNED_INT16 = new AtomType("SIGNED_INT16", Comparability.ORDERED, true);
public static final DataType NVARCHAR = new AtomType("VARCHAR", Comparability.ORDERED, false);
public static final DataType FLOAT32 = new AtomType("FLOAT32", Comparability.ORDERED, true);
public static final DataType FLOAT64 = new AtomType("FLOAT64", Comparability.ORDERED, true);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/optimize/Optimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/optimize/Optimizer.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/optimize/Optimizer.java
new file mode 100644
index 0000000..4b2037c
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/optimize/Optimizer.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.optimize;
+
+import java.io.Closeable;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillConfigurationException;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.physical.PhysicalPlan;
+
+public abstract class Optimizer implements Closeable{
+
+ public static String OPTIMIZER_IMPL_KEY = "drill.exec.optimizer.implementation";
+
+ public abstract void init(DrillConfig config);
+
+ public abstract PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan);
+ public abstract void close();
+
+ public static Optimizer getOptimizer(DrillConfig config) throws DrillConfigurationException{
+ Optimizer o = config.getInstanceOf(OPTIMIZER_IMPL_KEY, Optimizer.class);
+ o.init(config);
+ return o;
+ }
+
+ public interface OptimizationContext{
+ public int getPriority();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/DataValidationMode.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/DataValidationMode.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/DataValidationMode.java
new file mode 100644
index 0000000..6de2cfd
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/DataValidationMode.java
@@ -0,0 +1,24 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.physical;
+
+public enum DataValidationMode {
+ TERMINATE, // terminate the query if the data doesn't match expected.
+ DROP_RECORD, // drop the record that doesn't match the expected situation.
+ SINK_RECORD // record the failed record along with the rule violation in a secondary location.
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/FieldSet.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/FieldSet.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/FieldSet.java
index 05fc49d..c76098d 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/FieldSet.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/FieldSet.java
@@ -18,7 +18,6 @@
package org.apache.drill.common.physical;
import java.io.IOException;
-import java.util.HashSet;
import java.util.List;
import org.apache.drill.common.physical.FieldSet.De;
@@ -42,19 +41,10 @@ import com.google.common.collect.Lists;
public class FieldSet {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FieldSet.class);
- private List<RecordField> incoming = Lists.newArrayList();
- private List<RecordField> outgoing = Lists.newArrayList();
+ private List<RecordField> fields;
public FieldSet(Iterable<RecordField> fields){
- for(RecordField f : fields){
- if(f.getRoute().isIn()){
- incoming.add(f);
- }
-
- if(f.getRoute().isOut()){
- outgoing.add(f);
- }
- }
+ this.fields = Lists.newArrayList(fields);
}
@@ -83,14 +73,7 @@ public class FieldSet {
@Override
public void serialize(FieldSet value, JsonGenerator jgen, SerializerProvider provider) throws IOException,
JsonGenerationException {
- HashSet<RecordField> fields = new HashSet<RecordField>();
- for(RecordField f: value.incoming){
- fields.add(f);
- }
- for(RecordField f: value.outgoing){
- fields.add(f);
- }
- jgen.writeObject(Lists.newArrayList(fields));
+ jgen.writeObject(value.fields);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/RecordField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/RecordField.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/RecordField.java
index 821f286..2867084 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/RecordField.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/RecordField.java
@@ -28,14 +28,14 @@ public class RecordField {
private String name;
private DataType type;
- private Route route;
+ private ValueMode mode;
@JsonCreator
- public RecordField(@JsonProperty("name") String name, @JsonProperty("type") DataType type, @JsonProperty("route") Route route) {
+ public RecordField(@JsonProperty("name") String name, @JsonProperty("type") DataType type, @JsonProperty("mode") ValueMode mode) {
super();
this.name = name;
this.type = type;
- this.route = route;
+ this.mode = mode;
}
public String getName() {
@@ -46,34 +46,21 @@ public class RecordField {
return type;
}
- public Route getRoute() {
- return route;
+ public ValueMode getMode() {
+ return mode;
}
+ public static enum ValueMode {
+ VECTOR,
+ DICT,
+ RLE
+ }
+ public static enum ValueType {
+ OPTIONAL,
+ REQUIRED,
+ REPEATED
+ }
- public static enum Route {
- IN(true, false),
- OUT(false, true),
- THROUGH(true, true),
- OPAQUE(true, true);
-
- final boolean in;
- final boolean out;
-
- Route(boolean in, boolean out){
- this.in = in;
- this.out = out;
- }
-
- public boolean isIn() {
- return in;
- }
-
- public boolean isOut() {
- return out;
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePlan.java b/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePlan.java
new file mode 100644
index 0000000..e3e43aa
--- /dev/null
+++ b/sandbox/prototype/common/src/test/java/org/apache/drill/common/physical/ParsePlan.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.physical;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class ParsePlan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParsePlan.class);
+
+
+ @Test public void parseSimplePlan() throws Exception{
+ DrillConfig c = DrillConfig.create();
+ PhysicalPlan plan = PhysicalPlan.parse(c, Files.toString(FileUtils.getResourceAsFile("/dsort_physical.json"), Charsets.UTF_8));
+ System.out.println(plan.unparse(c));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/test/resources/basic_physical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/basic_physical.json b/sandbox/prototype/common/src/test/resources/basic_physical.json
new file mode 100644
index 0000000..4d1d329
--- /dev/null
+++ b/sandbox/prototype/common/src/test/resources/basic_physical.json
@@ -0,0 +1,42 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ storage:{
+ fs1:{
+ type:"mock"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"scan",
+ storageengine:"fs1",
+ entries:[
+ {"test1"}
+ ],
+ output:[
+ { "name":"key", mode: "VECTOR", type:"SINT32"},
+ { "name":"value", mode: "VECTOR", type:"SINT32"}
+ ]
+ },
+ {
+ @id:2,
+ child:1,
+ pop: "store",
+ mode: "SYSTEM_CHOICE",
+ storageengine: "fs1",
+ entries:[
+ {
+ path:"/sort/sorted/${partition_number}.seq",
+ key:"Text",
+ type:"JAVA_SEQUENCE"
+ }
+ ]
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/test/resources/dsort-physical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/dsort-physical.json b/sandbox/prototype/common/src/test/resources/dsort-physical.json
deleted file mode 100644
index 3c57a0a..0000000
--- a/sandbox/prototype/common/src/test/resources/dsort-physical.json
+++ /dev/null
@@ -1,76 +0,0 @@
-{
- head:{
- type:"APACHE_DRILL_PHYSICAL",
- version:"1",
- generator:{
- type:"manual"
- }
- },
- storage:{
- fs1:{
- type:"mock"
- }
- },
- graph:[
- {
- @id:1,
- pop:"scan",
- storageengine:"fs1",
- entries:[{}],
- fields:[
- { "name":"key", route: "OUT", type:"LATE"},
- { "name":"value", route: "OUT", type:"LATE"}
- ]
- },
- {
- @id:2,
- child: 1,
- pop:"quicknwaysort",
- orderings:[
- {
- order: "DESC",
- expr: "data.key"
- }
- ],
- fields:[
- { "name":"key", route: "THROUGH", type:"LATE"},
- { "name":"value", route: "OPAQUE", type:"LATE"}
- ]
-
- },
- {
- @id:3,
- child: 2,
- pop:"exchange",
- partition:{
- mode:"RANGE",
- exprs:["key"]
- },
- stitch:{
- mode:"RANDOM"
- },
- fields:[
- { "name":"key", route: "THROUGH", type:"LATE"},
- { "name":"value", route: "OPAQUE", type:"LATE"}
- ]
- },
- {
- @id:4,
- child:3,
- pop: "store",
- mode: "SYSTEM_CHOICE",
- storageengine: "fs1",
- entries:[
- {
- path:"/sort/sorted/${partition_number}.seq",
- key:"Text",
- type:"JAVA_SEQUENCE"
- }
- ],
- fields:[
- { "name":"key", route: "IN", type:"LATE"},
- { "name":"value", route: "IN", type:"LATE"}
- ]
- }
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/test/resources/dsort_logical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/dsort_logical.json b/sandbox/prototype/common/src/test/resources/dsort_logical.json
new file mode 100644
index 0000000..83d30e8
--- /dev/null
+++ b/sandbox/prototype/common/src/test/resources/dsort_logical.json
@@ -0,0 +1,40 @@
+{
+ head:{ type:"apache_drill_logical_plan", version:"1", generator:{ type:"manual", info:"na"}}},
+ storage:[ { type:"fs", name:"fs1", root:"file:///" }],
+ query:[ { op: "sequence", sequence: [
+ {
+ op:"scan",
+ storageengine:"fs1",
+ ref: "data",
+ selection: {
+ path: "/sort/unsorted/*.seq",
+ type: "JAVA_SEQUENCE"
+ }
+ },
+ {
+ op: "order",
+ orderings: [
+ {order: "desc", expr: "data.key" }
+ ]
+ },
+ {
+ op: "project",
+ projections: [
+ { ref: "output.key", expr: "data.key" },
+ { ref: "output.value", expr: "data.value" }
+ ]
+ },
+ {
+ op: "store",
+ storageengine: "fs1",
+ target: {
+ path: "/sort/sorted/${partition}.seq",
+ type: "JAVA_SEQUENCE",
+ partition: {
+ type: "ORDERED",
+ exprs: ["key"]
+ }
+ }
+ }
+ ]}]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/test/resources/dsort_physical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/dsort_physical.json b/sandbox/prototype/common/src/test/resources/dsort_physical.json
new file mode 100644
index 0000000..7c31df2
--- /dev/null
+++ b/sandbox/prototype/common/src/test/resources/dsort_physical.json
@@ -0,0 +1,72 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ storage:{
+ fs1:{
+ type:"mock"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"scan",
+ storageengine:"fs1",
+ entries:[{}],
+ output:[
+ { "name":"key", mode: "VECTOR", type:"LATE"},
+ { "name":"value", mode: "VECTOR", type:"LATE"}
+ ]
+ },
+ {
+ @id:2,
+ child: 1,
+ pop:"quicknwaysort",
+ orderings:[
+ {
+ order: "DESC",
+ expr: "data.key"
+ }
+ ],
+ output:[
+ { "name":"key", mode: "VECTOR", type:"LATE"},
+ { "name":"value", mode: "VECTOR", type:"LATE"}
+ ]
+
+ },
+ {
+ @id:3,
+ child: 2,
+ pop:"exchange",
+ partition:{
+ mode:"RANGE",
+ exprs:["key"]
+ },
+ stitch:{
+ mode:"RANDOM"
+ },
+ fields:[
+ { "name":"key" mode: "VECTOR", type:"LATE"},
+ { "name":"value" mode: "VECTOR", type:"LATE"}
+ ]
+ },
+ {
+ @id:4,
+ child:3,
+ pop: "store",
+ mode: "SYSTEM_CHOICE",
+ storageengine: "fs1",
+ entries:[
+ {
+ path:"/sort/sorted/${partition_number}.seq",
+ key:"Text",
+ type:"JAVA_SEQUENCE"
+ }
+ ]
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/common/src/test/resources/simple_plan.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/resources/simple_plan.json b/sandbox/prototype/common/src/test/resources/simple_plan.json
new file mode 100644
index 0000000..2457b1f
--- /dev/null
+++ b/sandbox/prototype/common/src/test/resources/simple_plan.json
@@ -0,0 +1,133 @@
+{
+ head:{
+ type:"apache_drill_logical_plan",
+ version:"1",
+ generator:{
+ type:"manual",
+ info:"na"
+ }
+ },
+ storage:{
+ logs: {
+ type:"text",
+ file: "local://logs/*.log",
+ compress:"gzip",
+ line-delimiter:"\n",
+ record-maker:{
+ type:"first-row",
+ delimiter:","
+ }
+ },
+ {
+ type:"mongo",
+ name:"users",
+ connection:"mongodb://blue:red@localhost/users"
+ },
+ {
+ type:"mysql",
+ name:"mysql",
+ connection:"jdbc:mysql://localhost/main"
+ }
+ ],
+ query:[
+ {
+ @id:"1",
+ op:"scan",
+ memo:"initial_scan",
+ storageengine:"local-logs",
+ selection: {}
+ },
+ {
+ @id:"2",
+ input:"1",
+ memo:"transform1",
+ op:"transform",
+ transforms:[
+ {
+ ref:"userId",
+ expr:"regex_like('activity.cookie', \"persistent=([^;]*)\")"
+ },
+ {
+ ref:"session",
+ expr:"regex_like('activity.cookie', \"session=([^;]*)\")"
+ }
+ ]
+ },
+ {
+ @id:"3",
+ input:"2",
+ memo:"transform2",
+ op:"transform",
+ transforms:[
+ {
+ ref:"userId",
+ expr:"regex_like('activity.cookie', \"persistent=([^;]*)\")"
+ },
+ {
+ ref:"session",
+ expr:"regex_like('activity.cookie', \"session=([^;]*)\")"
+ }
+ ]
+ },
+ {
+ @id:"7",
+ input:"3",
+ op:"sequence",
+ do:[
+ {
+ op:"transform",
+ memo:"seq_transform",
+ transforms:[
+ {
+ ref:"happy",
+ expr:"regex_like('ep2', \"dink\")"
+ }
+ ]
+ }
+ ,
+ {
+ op:"transform",
+ memo:"last_transform",
+ transforms:[
+ {
+ ref:"abc",
+ expr:"123"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ @id:"10",
+ input:"3",
+ op:"transform",
+ memo:"t3",
+ transforms:[
+ {
+ ref:"happy",
+ expr:"regex_like('ep2', \"dink\")"
+ }
+ ]
+ },
+ {
+ @id:12,
+ op:"join",
+ type: "inner",
+ left:"7",
+ right:"10",
+ conditions: [{relationship:"==", left: "1", right: "1" }]
+ }
+ ,
+ {
+ input: 12,
+ op: "store",
+ memo: "output sink",
+ target: {
+ file: "console:///stdout"
+ }
+
+ }
+
+
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index a458160..663bab4 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -1,15 +1,154 @@
<?xml version="1.0"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>exec-parent</artifactId>
- <groupId>org.apache.drill.exec</groupId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <artifactId>java-exec</artifactId>
- <name>java-exec</name>
-
- <dependencies>
- </dependencies>
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>exec-parent</artifactId>
+ <groupId>org.apache.drill.exec</groupId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>java-exec</artifactId>
+ <name>java-exec</name>
+
+ <properties>
+ <target.gen.source.path>${project.basedir}/target/generated-sources</target.gen.source.path>
+ <proto.cas.path>${project.basedir}/src/main/protobuf/</proto.cas.path>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>parquet-column</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>common</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ <version>1.30</version>
+ </dependency>
+ <dependency>
+ <groupId>com.netflix.curator</groupId>
+ <artifactId>curator-x-discovery</artifactId>
+ <version>1.1.9</version>
+ <exclusions>
+ <!-- <exclusion> -->
+ <!-- <artifactId>netty</artifactId> -->
+ <!-- <groupId>org.jboss.netty</groupId> -->
+ <!-- </exclusion> -->
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.0.5-M3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>1.1.0</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>jets3t</artifactId>
+ <groupId>net.java.dev.jets3t</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-logging</artifactId>
+ <groupId>commons-logging</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.carrotsearch</groupId>
+ <artifactId>hppc</artifactId>
+ <version>0.4.2</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>4.0.0.CR1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>2.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast</artifactId>
+ <version>2.5</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>native-maven-plugin</artifactId>
+ <version>1.0-alpha-7</version>
+ <configuration>
+ <javahClassNames>
+ <javahClassName>org.apache.drill.exec.mem.ByteBufferAllocator</javahClassName>
+ </javahClassNames>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>generate-sources</id>
+ <phase>generate-sources</phase>
+ <configuration>
+ <tasks>
+ <mkdir dir="${target.gen.source.path}" />
+ <path id="proto.path.files">
+ <fileset dir="${proto.cas.path}">
+ <include name="*.proto" />
+ </fileset>
+ </path>
+ <pathconvert pathsep=" " property="proto.files"
+ refid="proto.path.files" />
+
+ <exec executable="protoc">
+ <arg value="--java_out=${target.gen.source.path}" />
+ <arg value="--proto_path=${proto.cas.path}" />
+ <arg line="${proto.files}" />
+ </exec>
+ </tasks>
+ <sourceRoot>${target.gen.source.path}</sourceRoot>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- <plugin> -->
+ <!-- <groupId>com.github.igor-petruk.protobuf</groupId> -->
+ <!-- <artifactId>protobuf-maven-plugin</artifactId> -->
+ <!-- <version>0.6.2</version> -->
+ <!-- <executions> -->
+ <!-- <execution> -->
+ <!-- <goals> -->
+ <!-- <goal>run</goal> -->
+ <!-- </goals> -->
+ <!-- </execution> -->
+ <!-- </executions> -->
+ <!-- </plugin> -->
+ </plugins>
+ </build>
+
</project>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/ClasspathRSE.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/ClasspathRSE.java b/sandbox/prototype/exec/java-exec/rse/ClasspathRSE.java
new file mode 100644
index 0000000..aa8186d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/ClasspathRSE.java
@@ -0,0 +1,88 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StorageEngineConfigBase;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+import org.apache.drill.exec.ref.rops.DataWriter.ConverterType;
+import org.apache.drill.exec.ref.rops.ROP;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+public class ClasspathRSE extends RSEBase {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClasspathRSE.class);
+
+ private DrillConfig dConfig;
+ private SchemaPath rootPath;
+
+ public ClasspathRSE(ClasspathRSEConfig engineConfig, DrillConfig dConfig) throws SetupException{
+ this.dConfig = dConfig;
+ }
+
+
+ @JsonTypeName("classpath")
+ public static class ClasspathRSEConfig extends StorageEngineConfigBase {
+ }
+
+ public static class ClasspathInputConfig implements ReadEntry{
+ public String path;
+ public ConverterType type;
+ @JsonIgnore public SchemaPath rootPath;
+ }
+
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
+ ClasspathInputConfig c = scan.getSelection().getWith(dConfig, ClasspathInputConfig.class);
+ c.rootPath = scan.getOutputReference();
+ return Collections.singleton((ReadEntry) c);
+ }
+
+ @Override
+ public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws IOException {
+ ClasspathInputConfig e = getReadEntry(ClasspathInputConfig.class, readEntry);
+ URL u = RecordReader.class.getResource(e.path);
+ if(u == null){
+ throw new IOException(String.format("Failure finding classpath resource %s.", e.path));
+ }
+ switch(e.type){
+ case JSON:
+ return new JSONRecordReader(e.rootPath, dConfig, u.openStream(), parentROP);
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/ConsoleRSE.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/ConsoleRSE.java b/sandbox/prototype/exec/java-exec/rse/ConsoleRSE.java
new file mode 100644
index 0000000..1570ea9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/ConsoleRSE.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.OutputStream;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfigBase;
+import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.exec.ref.rops.DataWriter.ConverterType;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+public class ConsoleRSE extends RSEBase {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConsoleRSE.class);
+
+ private final DrillConfig dConfig;
+
+ public static enum Pipe {
+ STD_OUT, STD_ERR
+ };
+
+ public ConsoleRSE(ConsoleRSEConfig engineConfig, DrillConfig dConfig){
+ this.dConfig = dConfig;
+ }
+
+ public static class ConsoleOutputConfig {
+ public Pipe pipe = Pipe.STD_OUT;
+ public ConverterType type = ConverterType.JSON;
+ }
+
+ @JsonTypeName("console") public static class ConsoleRSEConfig extends StorageEngineConfigBase {}
+
+ public boolean supportsWrite() {
+ return true;
+ }
+
+ @Override
+ public RecordRecorder getWriter(Store store) {
+ ConsoleOutputConfig config = store.getTarget().getWith(dConfig, ConsoleOutputConfig.class);
+ OutputStream out = config.pipe == Pipe.STD_OUT ? System.out : System.err;
+ return new OutputStreamWriter(out, config.type, false);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/FileSystemRSE.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/FileSystemRSE.java b/sandbox/prototype/exec/java-exec/rse/FileSystemRSE.java
new file mode 100644
index 0000000..522191b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/FileSystemRSE.java
@@ -0,0 +1,144 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StorageEngineConfigBase;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+import org.apache.drill.exec.ref.rops.DataWriter.ConverterType;
+import org.apache.drill.exec.ref.rops.ROP;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+public class FileSystemRSE extends RSEBase {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemRSE.class);
+
+ private FileSystem fs;
+ private Path basePath;
+ private DrillConfig dConfig;
+
+ public FileSystemRSE(FileSystemRSEConfig engineConfig, DrillConfig dConfig) throws SetupException{
+ this.dConfig = dConfig;
+
+ try {
+ URI u = new URI(engineConfig.root);
+ String path = u.getPath();
+
+ if(path.charAt(path.length()-1) != '/') throw new SetupException(String.format("The file root provided of %s included a file '%s'. This must be a base path.", engineConfig.root, u.getPath()));
+ fs = FileSystem.get(u, new Configuration());
+ basePath = new Path(u.getPath());
+ } catch (URISyntaxException | IOException e) {
+ throw new SetupException("Failure while reading setting up file system root path.", e);
+ }
+ }
+
+
+ @JsonTypeName("fs")
+ public static class FileSystemRSEConfig extends StorageEngineConfigBase {
+ private String root;
+
+ @JsonCreator
+ public FileSystemRSEConfig(@JsonProperty("root") String root) {
+ this.root = root;
+ }
+ }
+
+ public static class FileSystemInputConfig {
+ public FileSpec[] files;
+ }
+
+ public static class FileSpec{
+ public String path;
+ public ConverterType type;
+ }
+
+
+ public class FSEntry implements ReadEntry{
+ Path path;
+ ConverterType type;
+ SchemaPath rootPath;
+
+ public FSEntry(FileSpec spec, SchemaPath rootPath){
+ this.path = new Path(basePath, spec.path);
+ this.type = spec.type;
+ this.rootPath = rootPath;
+ }
+
+ }
+
+ public class FileSystemOutputConfig {
+ public String file;
+ public ConverterType type;
+ }
+
+ public boolean supportsRead() {
+ return true;
+ }
+
+ public boolean supportsWrite() {
+ return true;
+ }
+
+ @Override
+ public RecordRecorder getWriter(Store store) throws IOException {
+ FileSystemOutputConfig config = store.getTarget().getWith(dConfig, FileSystemOutputConfig.class);
+ OutputStream out = fs.create(new Path(basePath, config.file));
+ return new OutputStreamWriter(out, config.type, true);
+ }
+
+ @Override
+ public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
+ Set<ReadEntry> s = new HashSet<ReadEntry>();
+ for(FileSpec f : scan.getSelection().getWith(dConfig, FileSystemInputConfig.class).files){
+ s.add(new FSEntry(f, scan.getOutputReference()));
+ }
+ return s;
+ }
+
+ @Override
+ public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws IOException {
+ FSEntry e = getReadEntry(FSEntry.class, readEntry);
+
+ switch(e.type){
+ case JSON:
+ return new JSONRecordReader(e.rootPath, dConfig, fs.open(e.path), parentROP);
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/JSONDataWriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/JSONDataWriter.java b/sandbox/prototype/exec/java-exec/rse/JSONDataWriter.java
new file mode 100644
index 0000000..24434d5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/JSONDataWriter.java
@@ -0,0 +1,142 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.drill.exec.ref.rops.DataWriter;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+
+public class JSONDataWriter implements DataWriter{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONDataWriter.class);
+
+ private final JsonGenerator g;
+// private CharSequence transientName;
+
+ public JSONDataWriter(OutputStream out) throws IOException{
+ JsonFactory f = new JsonFactory();
+
+ this.g = f.createJsonGenerator(out, JsonEncoding.UTF8);
+ this.g.useDefaultPrettyPrinter();
+ }
+
+ private String s(CharSequence seq) {
+ String s = (seq instanceof String) ? (String) seq : seq.toString();
+ return s;
+ }
+
+ @Override
+ public void startRecord() throws IOException {
+
+ }
+
+ @Override
+ public void writeArrayStart(int length) throws IOException {
+ g.writeStartArray();
+ }
+
+ @Override
+ public void writeArrayElementStart() throws IOException {
+ }
+
+ @Override
+ public void writeArrayElementEnd() throws IOException {
+ }
+
+ @Override
+ public void writeArrayEnd() throws IOException {
+ g.writeEndArray();
+ }
+
+ @Override
+ public void writeMapStart() throws IOException {
+ g.writeStartObject();
+ }
+
+ @Override
+ public void writeMapKey(CharSequence seq) throws IOException {
+ g.writeFieldName(s(seq));
+ }
+
+ @Override
+ public void writeMapValueStart() throws IOException {
+ }
+
+ @Override
+ public void writeMapValueEnd() throws IOException {
+ }
+
+ @Override
+ public void writeMapEnd() throws IOException {
+ g.writeEndObject();
+ }
+
+ @Override
+ public void writeBoolean(boolean b) throws IOException {
+ g.writeBoolean(b);
+ }
+
+ @Override
+ public void writeSInt32(int value) throws IOException {
+ g.writeNumber(value);
+ }
+
+ @Override
+ public void writeSInt64(long value) throws IOException {
+ g.writeNumber(value);
+ }
+
+ @Override
+ public void writeBytes(byte[] bytes) throws IOException {
+ g.writeBinary(bytes);
+ }
+
+ @Override
+ public void writeSFloat64(double value) throws IOException {
+ g.writeNumber(value);
+ }
+
+ @Override
+ public void writeSFloat32(float value) throws IOException {
+ g.writeNumber(value);
+ }
+
+ @Override
+ public void writeNullValue() throws IOException {
+ g.writeNull();
+ }
+
+ @Override
+ public void writeCharSequence(CharSequence value) throws IOException {
+ g.writeString(s(value));
+ }
+
+ @Override
+ public void endRecord() throws IOException {
+ g.writeRawValue("\n");
+ }
+
+ public void finish() throws IOException{
+ g.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/rse/JSONRecordReader.java
new file mode 100644
index 0000000..7510e72
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/JSONRecordReader.java
@@ -0,0 +1,183 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.RunOutcome;
+import org.apache.drill.exec.ref.UnbackedRecord;
+import org.apache.drill.exec.ref.exceptions.RecordException;
+import org.apache.drill.exec.ref.rops.ROP;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.drill.exec.ref.values.ScalarValues.BooleanScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.BytesScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.DoubleScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.IntegerScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.LongScalar;
+import org.apache.drill.exec.ref.values.ScalarValues.StringScalar;
+import org.apache.drill.exec.ref.values.SimpleArrayValue;
+import org.apache.drill.exec.ref.values.SimpleMapValue;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Charsets;
+
+public class JSONRecordReader implements RecordReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
+
+ private InputStreamReader input;
+ private String file;
+ private SchemaPath rootPath;
+ private JsonParser parser;
+ private UnbackedRecord record = new UnbackedRecord();
+ private ObjectMapper mapper;
+ private ROP parent;
+
+ public JSONRecordReader(SchemaPath rootPath, DrillConfig dConfig, InputStream stream, ROP parent) throws IOException {
+ this.input = new InputStreamReader(stream, Charsets.UTF_8);
+ this.mapper = dConfig.getMapper();
+ this.parser = mapper.getFactory().createJsonParser(input);
+ this.parent = parent;
+ this.rootPath = rootPath;
+ }
+
+ private class NodeIter implements RecordIterator {
+
+ @Override
+ public NextOutcome next() {
+// logger.debug("Next Record Called");
+ try {
+ if (parser.nextToken() == null) {
+// logger.debug("No current token, returning.");
+ return NextOutcome.NONE_LEFT;
+ }
+ JsonNode n = mapper.readTree(parser);
+ if (n == null) {
+// logger.debug("Nothing was returned for read tree, returning.");
+ return NextOutcome.NONE_LEFT;
+ }
+// logger.debug("Record found, returning new json record.");
+ record.setClearAndSetRoot(rootPath, convert(n));
+ // todo, add schema checking here.
+ return NextOutcome.INCREMENTED_SCHEMA_CHANGED;
+ } catch (IOException e) {
+ throw new RecordException("Failure while reading record", null, e);
+ }
+ }
+
+
+ @Override
+ public RecordPointer getRecordPointer() {
+ return record;
+ }
+
+
+ @Override
+ public ROP getParent() {
+ return parent;
+ }
+
+ }
+
+ private DataValue convert(JsonNode node) {
+ if (node == null || node.isNull() || node.isMissingNode()) {
+ return DataValue.NULL_VALUE;
+ } else if (node.isArray()) {
+ SimpleArrayValue arr = new SimpleArrayValue(node.size());
+ for (int i = 0; i < node.size(); i++) {
+ arr.addToArray(i, convert(node.get(i)));
+ }
+ return arr;
+ } else if (node.isObject()) {
+ SimpleMapValue map = new SimpleMapValue();
+ String name;
+ for (Iterator<String> iter = node.fieldNames(); iter.hasNext();) {
+ name = iter.next();
+ map.setByName(name, convert(node.get(name)));
+ }
+ return map;
+ } else if (node.isBinary()) {
+ try {
+ return new BytesScalar(node.binaryValue());
+ } catch (IOException e) {
+ throw new RuntimeException("Failure converting binary value.", e);
+ }
+ } else if (node.isBigDecimal()) {
+ throw new UnsupportedOperationException();
+// return new BigDecimalScalar(node.decimalValue());
+ } else if (node.isBigInteger()) {
+ throw new UnsupportedOperationException();
+// return new BigIntegerScalar(node.bigIntegerValue());
+ } else if (node.isBoolean()) {
+ return new BooleanScalar(node.asBoolean());
+ } else if (node.isFloatingPointNumber()) {
+ if (node.isBigDecimal()) {
+ throw new UnsupportedOperationException();
+// return new BigDecimalScalar(node.decimalValue());
+ } else {
+ return new DoubleScalar(node.asDouble());
+ }
+ } else if (node.isInt()) {
+ return new IntegerScalar(node.asInt());
+ } else if (node.isLong()) {
+ return new LongScalar(node.asLong());
+ } else if (node.isTextual()) {
+ return new StringScalar(node.asText());
+ } else {
+ throw new UnsupportedOperationException(String.format("Don't know how to convert value of type %s.", node
+ .getClass().getCanonicalName()));
+ }
+
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.drill.exec.ref.rse.DataReader#getIterator()
+ */
+ @Override
+ public RecordIterator getIterator() {
+ return new NodeIter();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.drill.exec.ref.rse.DataReader#cleanup()
+ */
+ @Override
+ public void cleanup() {
+ try {
+ parser.close();
+ this.input.close();
+ } catch (IOException e) {
+ logger.warn("Error while closing InputStream for file {}", file, e);
+ }
+
+ }
+
+
+ @Override
+ public void setup() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/OutputStreamWriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/OutputStreamWriter.java b/sandbox/prototype/exec/java-exec/rse/OutputStreamWriter.java
new file mode 100644
index 0000000..20d5b8f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/OutputStreamWriter.java
@@ -0,0 +1,78 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.RunOutcome.OutcomeType;
+import org.apache.drill.exec.ref.rops.DataWriter;
+import org.apache.drill.exec.ref.rops.DataWriter.ConverterType;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+public class OutputStreamWriter implements RecordRecorder{
+
+ private OutputStream stream;
+ private FSDataOutputStream posStream;
+ private DataWriter writer;
+ private ConverterType type;
+ private boolean closeStream;
+
+ public OutputStreamWriter(OutputStream stream, ConverterType type, boolean closeStream){
+ this.stream = stream;
+ this.closeStream = closeStream;
+ this.type = type;
+ if(stream instanceof FSDataOutputStream) posStream = (FSDataOutputStream) stream;
+ }
+
+ @Override
+ public void setup() throws IOException {
+ DataWriter w = null;
+ switch(type){
+ case JSON:
+ w = new JSONDataWriter(stream);
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ this.writer = w;
+ }
+
+ private long getPos() throws IOException{
+ if(posStream == null) return 0;
+ return posStream.getPos();
+ }
+
+ @Override
+ public long recordRecord(RecordPointer pointer) throws IOException {
+ pointer.write(writer);
+ return getPos();
+ }
+
+ @Override
+ public void finish(OutcomeType outcome) throws IOException {
+ writer.finish();
+ if(closeStream){
+ stream.close();
+ }else{
+ stream.flush();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/QueueRSE.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/QueueRSE.java b/sandbox/prototype/exec/java-exec/rse/QueueRSE.java
new file mode 100644
index 0000000..9a0a132
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/QueueRSE.java
@@ -0,0 +1,100 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfigBase;
+import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.RunOutcome.OutcomeType;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+public class QueueRSE extends RSEBase {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueueRSE.class);
+
+ private DrillConfig dConfig;
+ private final List<Queue<Object>> sinkQueues;
+
+ public QueueRSE(QueueRSEConfig engineConfig, DrillConfig dConfig) throws SetupException{
+ this.dConfig = dConfig;
+ sinkQueues = Collections.singletonList( (Queue<Object>) (new ArrayBlockingQueue<Object>(100)));
+ }
+
+ public Queue<Object> getQueue(int number){
+ return sinkQueues.get(number);
+ }
+
+ @JsonTypeName("queue") public static class QueueRSEConfig extends StorageEngineConfigBase {}
+
+ public static class QueueOutputInfo{
+ public int number;
+ }
+
+ public boolean supportsWrite() {
+ return true;
+ }
+
+
+ @Override
+ public RecordRecorder getWriter(Store store) throws IOException {
+ QueueOutputInfo config = store.getTarget().getWith(dConfig, QueueOutputInfo.class);
+ Queue<Object> q = dConfig.getQueue(config.number);
+ return new QueueRecordRecorder(q);
+ }
+
+
+ private class QueueRecordRecorder implements RecordRecorder{
+
+ private final Queue<Object> queue;
+
+ public QueueRecordRecorder(Queue<Object> queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public void setup() throws IOException {
+ }
+
+ @Override
+ public long recordRecord(RecordPointer r) throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final JSONDataWriter writer = new JSONDataWriter(baos);
+ r.write(writer);
+ writer.finish();
+ queue.add(baos.toByteArray());
+ return 0;
+ }
+
+ @Override
+ public void finish(OutcomeType type) throws IOException {
+ queue.add(type);
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/RSEBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/RSEBase.java b/sandbox/prototype/exec/java-exec/rse/RSEBase.java
new file mode 100644
index 0000000..3f86c98
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/RSEBase.java
@@ -0,0 +1,71 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ref.ExecRefConstants;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.exceptions.MajorException;
+import org.apache.drill.exec.ref.rops.ROP;
+
+import com.typesafe.config.Config;
+
+public abstract class RSEBase implements ReferenceStorageEngine{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RSEBase.class);
+
+ @Override
+ public boolean supportsRead() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsWrite() {
+ return false;
+ }
+
+ @Override
+ public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
+ throw new UnsupportedOperationException(String.format("%s does not support reads.", this.getClass().getCanonicalName()));
+ }
+
+ @Override
+ public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws IOException {
+ throw new UnsupportedOperationException(String.format("%s does not support reads.", this.getClass().getCanonicalName()));
+ }
+
+ @Override
+ public RecordRecorder getWriter(Store store) throws IOException {
+ throw new UnsupportedOperationException(String.format("%s does not support writes.", this.getClass().getCanonicalName()));
+ }
+
+ public static Class<?>[] getSubTypes(Config config){
+ Collection<Class<? extends ReferenceStorageEngine>> engines = PathScanner.scanForImplementations(ReferenceStorageEngine.class, config.getStringList(ExecRefConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+ return engines.toArray(new Class<?>[engines.size()]);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T extends ReadEntry> T getReadEntry(Class<T> c, ReadEntry entry){
+ if(!c.isAssignableFrom(entry.getClass())) throw new MajorException(String.format("Expected entry type was invalid. Expected entry of type %s but received type of %s.", c.getCanonicalName(), entry.getClass().getCanonicalName()));
+ return (T) entry;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/RSERegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/RSERegistry.java b/sandbox/prototype/exec/java-exec/rse/RSERegistry.java
new file mode 100644
index 0000000..4266aac
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/RSERegistry.java
@@ -0,0 +1,85 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ref.ExecRefConstants;
+import org.apache.drill.exec.ref.exceptions.SetupException;
+
+import com.typesafe.config.Config;
+
+public class RSERegistry {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RSERegistry.class);
+
+ private Map<Object, Constructor<? extends ReferenceStorageEngine>> availableEngines = new HashMap<Object, Constructor<? extends ReferenceStorageEngine>>();
+ private Map<StorageEngineConfig, ReferenceStorageEngine> activeEngines = new HashMap<StorageEngineConfig, ReferenceStorageEngine>();
+ private DrillConfig config;
+
+ public RSERegistry(DrillConfig config){
+ this.config = config;
+ setup(config);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void setup(DrillConfig config){
+ Collection<Class<? extends ReferenceStorageEngine>> engines = PathScanner.scanForImplementations(ReferenceStorageEngine.class, config.getStringList(ExecRefConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+ logger.debug("Loading storage engines {}", engines);
+ for(Class<? extends ReferenceStorageEngine> engine: engines){
+ int i =0;
+ for(Constructor<?> c : engine.getConstructors()){
+ Class<?>[] params = c.getParameterTypes();
+ if(params.length != 2 || params[1] == Config.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){
+ logger.debug("Skipping ReferenceStorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, Config)]", c, engine);
+ continue;
+ }
+ availableEngines.put(params[0], (Constructor<? extends ReferenceStorageEngine>) c);
+ i++;
+ }
+ if(i == 0){
+ logger.debug("Skipping registration of ReferenceStorageEngine {} as it doesn't have a constructor with the parameters of (StorangeEngineConfig, Config)", engine.getCanonicalName());
+ }
+ }
+ }
+
+ public ReferenceStorageEngine getEngine(StorageEngineConfig engineConfig) throws SetupException{
+ ReferenceStorageEngine engine = activeEngines.get(engineConfig);
+ if(engine != null) return engine;
+ Constructor<? extends ReferenceStorageEngine> c = availableEngines.get(engineConfig.getClass());
+ if(c == null) throw new SetupException(String.format("Failure finding StorageEngine constructor for config %s", engineConfig));
+ try {
+ return c.newInstance(engineConfig, config);
+ } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException)e).getTargetException() : e;
+ if(t instanceof SetupException) throw ((SetupException) t);
+ throw new SetupException(String.format("Failure setting up new storage engine configuration for config %s", engineConfig), t);
+ }
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/RecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/RecordReader.java b/sandbox/prototype/exec/java-exec/rse/RecordReader.java
new file mode 100644
index 0000000..b7840bc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/RecordReader.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import org.apache.drill.exec.ref.RecordIterator;
+
+public interface RecordReader {
+
+ public abstract RecordIterator getIterator();
+ public abstract void setup();
+ public abstract void cleanup();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/RecordRecorder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/RecordRecorder.java b/sandbox/prototype/exec/java-exec/rse/RecordRecorder.java
new file mode 100644
index 0000000..9527b0b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/RecordRecorder.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.RunOutcome;
+
+public interface RecordRecorder {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordRecorder.class);
+
+ public void setup() throws IOException;
+ public long recordRecord(RecordPointer pointer) throws IOException;
+ public void finish(RunOutcome.OutcomeType outcome) throws IOException;
+
+}