You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:49 UTC
[11/27] git commit: Initial Parquet commit. Suports INT, LONG, FLOAT,
DOUBLE, distributed scheduling.
Initial Parquet commit. Suports INT, LONG, FLOAT, DOUBLE, distributed scheduling.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0a2f997f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0a2f997f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0a2f997f
Branch: refs/heads/master
Commit: 0a2f997ff8d95b816238edc78f8ccf4c5cbbb924
Parents: 0a327ed
Author: Jason Altekruse <al...@gmial.com>
Authored: Thu Aug 8 11:50:47 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 16:57:08 2013 -0700
----------------------------------------------------------------------
.../org/apache/drill/common/JSONOptions.java | 5 +-
.../drill/common/logical/LogicalPlan.java | 2 +-
.../common/logical/StorageEngineConfig.java | 2 +
.../common/logical/StorageEngineConfigBase.java | 2 +
.../drill/storage/MockStorageEngineConfig.java | 18 +-
sandbox/prototype/exec/java-exec/pom.xml | 67 ++-
.../templates/VariableLengthVectors.java | 6 +-
.../org/apache/drill/exec/ExecConstants.java | 1 +
.../exec/exception/OptimizerException.java | 17 +-
.../org/apache/drill/exec/ops/QueryContext.java | 7 +
.../apache/drill/exec/opt/BasicOptimizer.java | 205 ++++---
.../drill/exec/physical/ReadEntryFromHDFS.java | 54 ++
.../drill/exec/physical/ReadEntryWithPath.java | 41 ++
.../exec/physical/base/AbstractGroupScan.java | 45 ++
.../physical/base/AbstractPhysicalVisitor.java | 9 +-
.../drill/exec/physical/base/AbstractScan.java | 84 ---
.../drill/exec/physical/base/GroupScan.java | 36 ++
.../apache/drill/exec/physical/base/Leaf.java | 2 +-
.../exec/physical/base/PhysicalOperator.java | 2 +-
.../exec/physical/base/PhysicalVisitor.java | 3 +-
.../apache/drill/exec/physical/base/Scan.java | 21 +-
.../drill/exec/physical/base/SubScan.java | 23 +
.../exec/physical/config/MockGroupScanPOP.java | 221 +++++++
.../exec/physical/config/MockRecordReader.java | 4 +-
.../physical/config/MockScanBatchCreator.java | 6 +-
.../drill/exec/physical/config/MockScanPOP.java | 193 ------
.../exec/physical/config/MockStorageEngine.java | 8 +-
.../exec/physical/config/MockSubScanPOP.java | 115 ++++
.../drill/exec/physical/impl/ImplCreator.java | 45 +-
.../drill/exec/physical/impl/OutputMutator.java | 1 +
.../drill/exec/physical/impl/ScanBatch.java | 6 +
.../exec/physical/impl/SingleSenderCreator.java | 3 +
.../drill/exec/planner/PhysicalPlanReader.java | 14 +-
.../planner/fragment/MakeFragmentsVisitor.java | 9 +-
.../exec/planner/fragment/Materializer.java | 17 +-
.../planner/fragment/SimpleParallelizer.java | 2 +
.../exec/planner/fragment/StatsCollector.java | 19 +-
.../drill/exec/planner/fragment/Wrapper.java | 20 +-
.../exec/record/FragmentWritableBatch.java | 6 +
.../drill/exec/record/RecordBatchLoader.java | 38 +-
.../apache/drill/exec/record/WritableBatch.java | 1 -
.../org/apache/drill/exec/rpc/RpcEncoder.java | 12 +-
.../drill/exec/server/DrillbitContext.java | 10 +-
.../drill/exec/service/ServiceEngine.java | 5 +-
.../drill/exec/store/AbstractStorageEngine.java | 4 +-
.../drill/exec/store/AffinityCalculator.java | 112 ++++
.../apache/drill/exec/store/StorageEngine.java | 12 +-
.../drill/exec/store/StorageEngineRegistry.java | 6 +-
.../apache/drill/exec/store/VectorHolder.java | 9 +-
.../drill/exec/store/parquet/BitReader.java | 87 +++
.../drill/exec/store/parquet/ColumnReader.java | 115 ++++
.../store/parquet/FixedByteAlignedReader.java | 48 ++
.../exec/store/parquet/PageReadStatus.java | 116 ++++
.../exec/store/parquet/ParquetGroupScan.java | 357 +++++++++++
.../exec/store/parquet/ParquetRecordReader.java | 403 +++++++++++++
.../exec/store/parquet/ParquetRowGroupScan.java | 137 +++++
.../store/parquet/ParquetScanBatchCreator.java | 73 +++
.../store/parquet/ParquetStorageEngine.java | 116 ++++
.../parquet/ParquetStorageEngineConfig.java | 66 +++
.../exec/store/parquet/VarLenBinaryReader.java | 130 ++++
.../drill/exec/vector/BaseDataValueVector.java | 6 +-
.../work/AbstractFragmentRunnerListener.java | 4 +-
.../exec/work/RemoteFragmentRunnerListener.java | 45 ++
.../work/RemotingFragmentRunnerListener.java | 48 --
.../exec/work/batch/BitComHandlerImpl.java | 6 +-
.../apache/drill/exec/work/foreman/Foreman.java | 2 +-
.../work/fragment/RemoteFragmentHandler.java | 4 +-
.../parquet/hadoop/CodecFactoryExposer.java | 42 ++
.../src/main/resources/drill-module.conf | 7 +-
.../impl/TestDistributedFragmentRun.java | 5 +-
.../apache/drill/exec/store/ByteArrayUtil.java | 181 ++++++
.../drill/exec/store/JSONRecordReaderTest.java | 5 +
.../apache/drill/exec/store/MockScantTest.java | 115 ++++
.../exec/store/ParquetRecordReaderTest.java | 594 +++++++++++++++++++
.../exec/store/TestAffinityCalculator.java | 229 +++++++
.../exec/store/TestParquetPhysicalPlan.java | 56 ++
.../src/test/resources/drill-module.conf | 4 +-
.../src/test/resources/filter/test1.json | 2 +-
.../test/resources/functions/float4Equal.json | 2 +-
.../resources/functions/float4GreaterThan.json | 2 +-
.../functions/float4GreaterThanEqual.json | 2 +-
.../resources/functions/float4LessThan.json | 2 +-
.../functions/float4LessThanEqual.json | 2 +-
.../resources/functions/float4NotEqual.json | 2 +-
.../test/resources/functions/float8Equal.json | 2 +-
.../resources/functions/float8GreaterThan.json | 2 +-
.../functions/float8GreaterThanEqual.json | 2 +-
.../resources/functions/float8LessThan.json | 2 +-
.../functions/float8LessThanEqual.json | 2 +-
.../resources/functions/float8NotEqual.json | 2 +-
.../src/test/resources/functions/intEqual.json | 2 +-
.../resources/functions/intGreaterThan.json | 2 +-
.../functions/intGreaterThanEqual.json | 2 +-
.../test/resources/functions/intLessThan.json | 2 +-
.../resources/functions/intLessThanEqual.json | 2 +-
.../test/resources/functions/intNotEqual.json | 2 +-
.../src/test/resources/functions/longEqual.json | 2 +-
.../resources/functions/longGreaterThan.json | 2 +-
.../functions/longGreaterThanEqual.json | 2 +-
.../test/resources/functions/longLessThan.json | 2 +-
.../resources/functions/longLessThanEqual.json | 2 +-
.../test/resources/functions/longNotEqual.json | 2 +-
.../functions/nullableBigIntEqual.json | 2 +-
.../functions/nullableBigIntGreaterThan.json | 2 +-
.../nullableBigIntGreaterThanEqual.json | 2 +-
.../functions/nullableBigIntLessThan.json | 2 +-
.../functions/nullableBigIntLessThanEqual.json | 2 +-
.../functions/nullableBigIntNotEqual.json | 2 +-
.../resources/functions/nullableIntEqual.json | 2 +-
.../functions/nullableIntGreaterThan.json | 2 +-
.../functions/nullableIntGreaterThanEqual.json | 2 +-
.../functions/nullableIntLessThan.json | 2 +-
.../functions/nullableIntLessThanEqual.json | 2 +-
.../functions/nullableIntNotEqual.json | 2 +-
.../resources/functions/testByteSubstring.json | 2 +-
.../test/resources/functions/testIsNotNull.json | 2 +-
.../test/resources/functions/testIsNull.json | 2 +-
.../test/resources/functions/testSubstring.json | 2 +-
.../functions/testSubstringNegative.json | 2 +-
.../java-exec/src/test/resources/mock-scan.json | 31 +
.../src/test/resources/parquet_scan_screen.json | 44 ++
.../parquet_scan_union_screen_physical.json | 35 ++
.../src/test/resources/physical_repeated_1.json | 2 +-
.../src/test/resources/project/test1.json | 2 +-
.../src/test/resources/remover/test1.json | 2 +-
.../src/test/resources/sort/one_key_sort.json | 2 +-
.../src/test/resources/sort/two_key_sort.json | 2 +-
.../prototype/exec/java-exec/src/test/sh/runbit | 2 +-
.../org/apache/drill/exec/ref/ROPConverter.java | 5 +-
.../apache/drill/exec/ref/rse/ClasspathRSE.java | 5 +
.../apache/drill/exec/ref/rse/ConsoleRSE.java | 8 +-
.../drill/exec/ref/rse/FileSystemRSE.java | 17 +
.../org/apache/drill/exec/ref/rse/QueueRSE.java | 19 +-
.../java/org/apache/drill/optiq/DrillScan.java | 2 +-
134 files changed, 4196 insertions(+), 617 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java
index 8a185a4..d091e17 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/JSONOptions.java
@@ -65,7 +65,10 @@ public class JSONOptions {
public <T> T getListWith(DrillConfig config, TypeReference<T> t) throws IOException {
ObjectMapper mapper = config.getMapper();
return mapper.treeAsTokens(root).readValueAs(t);
- // return mapper.treeToValue(root, mapper.getTypeFactory().constructCollectionType(List.class, c));
+ }
+
+ public <T> T getListWith(ObjectMapper mapper, TypeReference<T> t) throws IOException {
+ return mapper.treeAsTokens(root).readValueAs(t);
}
public JsonNode path(String name){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
index 05fbd1f..742001a 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
@@ -63,7 +63,7 @@ public class LogicalPlan {
return GraphAlgos.TopoSorter.sortLogical(graph);
}
- public StorageEngineConfig getStorageEngine(String name) {
+ public StorageEngineConfig getStorageEngineConfig(String name) {
return storageEngineMap.get(name);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java
index 3a893d6..b73a2c1 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfig.java
@@ -23,4 +23,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property="type")
public interface StorageEngineConfig{
+
+ public boolean equals(Object o);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java
index 853196c..51dbef3 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/StorageEngineConfigBase.java
@@ -35,5 +35,7 @@ public abstract class StorageEngineConfigBase implements StorageEngineConfig{
logger.debug("Adding Storage Engine Configs including {}", (Object) sec );
return sec;
}
+
+ public abstract boolean equals(Object o);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/common/src/test/java/org/apache/drill/storage/MockStorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/test/java/org/apache/drill/storage/MockStorageEngineConfig.java b/sandbox/prototype/common/src/test/java/org/apache/drill/storage/MockStorageEngineConfig.java
index bc24b2e..5843c38 100644
--- a/sandbox/prototype/common/src/test/java/org/apache/drill/storage/MockStorageEngineConfig.java
+++ b/sandbox/prototype/common/src/test/java/org/apache/drill/storage/MockStorageEngineConfig.java
@@ -39,5 +39,21 @@ public class MockStorageEngineConfig extends StorageEngineConfigBase{
return url;
}
-
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ MockStorageEngineConfig that = (MockStorageEngineConfig) o;
+
+ if (url != null ? !url.equals(that.url) : that.url != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return url != null ? url.hashCode() : 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index 4b13952..cd9bc9a 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<project
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
- xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>exec-parent</artifactId>
@@ -46,7 +46,12 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-column</artifactId>
- <version>1.0.0</version>
+ <version>1.0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>1.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
@@ -101,16 +106,57 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
- <version>1.1.0</version>
+ <version>1.0.3-mapr-2.1.2.1</version>
<exclusions>
<exclusion>
<artifactId>jets3t</artifactId>
<groupId>net.java.dev.jets3t</groupId>
</exclusion>
<exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+
+ <exclusion>
+ <artifactId>mockito-all</artifactId>
+ <groupId>org.mockito</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-logging-api</artifactId>
+ <groupId>commons-logging</groupId>
+ </exclusion>
+ <exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>servlet-api-2.5</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jasper-runtime</artifactId>
+ <groupId>tomcat</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jasper-compiler</artifactId>
+ <groupId>tomcat</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-server</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>core</artifactId>
+ <groupId>org.eclipse.jdt</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -191,13 +237,13 @@
</fileset>
</path>
<pathconvert pathsep=" " property="proto.files"
- refid="proto.path.files" />
+ refid="proto.path.files" />
<exec executable="protoc">
<arg value="--java_out=${target.gen.source.path}" />
<arg value="--proto_path=${proto.cas.path}" />
<arg
- value="--proto_path=${project.basedir}/../../common/src/main/protobuf/" />
+ value="--proto_path=${project.basedir}/../../common/src/main/protobuf/" />
<arg line="${proto.files}" />
</exec>
</tasks>
@@ -244,4 +290,13 @@
</plugins>
</build>
+
+ <repositories>
+ <repository>
+ <id>mapr-releases</id>
+ <url>http://repository.mapr.com/maven/</url>
+ <snapshots><enabled>false</enabled></snapshots>
+ <releases><enabled>true</enabled></releases>
+ </repository>
+ </repositories>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
index 061234c..7ceafe4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
@@ -21,11 +21,9 @@ import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.record.DeadBuf;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
-import org.mortbay.jetty.servlet.Holder;
import com.google.common.base.Charsets;
-import antlr.collections.impl.Vector;
/**
* ${minor.class}Vector implements a vector of variable width values. Elements in the vector
@@ -197,6 +195,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
public int getValueCount() {
return valueCount;
}
+
+ public UInt${type.width}Vector getOffsetVector(){
+ return offsetVector;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 4eb0f4c..5e7ddf0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -30,4 +30,5 @@ public interface ExecConstants {
public static final String INITIAL_USER_PORT = "drill.exec.rpc.user.port";
public static final String METRICS_CONTEXT_NAME = "drill.exec.metrics.context";
public static final String FUNCTION_PACKAGES = "drill.exec.functions";
+ public static final String USE_IP_ADDRESS = "drill.exec.rpc.use.ip";
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OptimizerException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OptimizerException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OptimizerException.java
index c57ce4a..ac56afb 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OptimizerException.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OptimizerException.java
@@ -1,14 +1,13 @@
package org.apache.drill.exec.exception;
-/**
- * Created with IntelliJ IDEA.
- * User: jaltekruse
- * Date: 6/11/13
- * Time: 5:37 PM
- * To change this template use File | Settings | File Templates.
- */
-public class OptimizerException extends Exception{
- public OptimizerException(String s) {
+import org.apache.drill.common.exceptions.DrillException;
+
+public class OptimizerException extends DrillException {
+ public OptimizerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public OptimizerException(String s) {
super(s);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 1c251b8..44117ff 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -19,12 +19,15 @@ package org.apache.drill.exec.ops;
import java.util.Collection;
+import org.apache.drill.common.logical.StorageEngineConfig;
import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.exception.SetupException;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.rpc.bit.BitCom;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StorageEngine;
public class QueryContext {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
@@ -46,6 +49,10 @@ public class QueryContext {
return queryId;
}
+ public StorageEngine getStorageEngine(StorageEngineConfig config) throws SetupException {
+ return drillbitContext.getStorageEngine(config);
+ }
+
public DistributedCache getCache(){
return drillbitContext.getCache();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 289ec4b..c4a7e43 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -2,22 +2,28 @@ package org.apache.drill.exec.opt;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.drill.common.PlanProperties;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.expression.*;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.FunctionDefinition;
+import org.apache.drill.common.expression.NoArgValidator;
+import org.apache.drill.common.expression.OutputTypeDeterminer;
import org.apache.drill.common.logical.LogicalPlan;
-import org.apache.drill.common.logical.data.*;
import org.apache.drill.common.logical.data.Filter;
import org.apache.drill.common.logical.data.Project;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.common.logical.data.SinkOperator;
+import org.apache.drill.common.logical.data.Store;
import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.exception.OptimizerException;
+import org.apache.drill.exec.exception.SetupException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -27,112 +33,127 @@ import com.fasterxml.jackson.core.type.TypeReference;
public class BasicOptimizer extends Optimizer{
- private DrillConfig config;
- private QueryContext context;
-
- public BasicOptimizer(DrillConfig config, QueryContext context){
- this.config = config;
- this.context = context;
+ private DrillConfig config;
+ private QueryContext context;
+
+ public BasicOptimizer(DrillConfig config, QueryContext context){
+ this.config = config;
+ this.context = context;
+ }
+
+ @Override
+ public void init(DrillConfig config) {
+
+ }
+
+ @Override
+ public PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) {
+ Object obj = new Object();
+ Collection<SinkOperator> roots = plan.getGraph().getRoots();
+ List<PhysicalOperator> physOps = new ArrayList<PhysicalOperator>(roots.size());
+ LogicalConverter converter = new LogicalConverter(plan);
+ for ( SinkOperator op : roots){
+ try {
+ PhysicalOperator pop = op.accept(converter, obj);
+ System.out.println(pop);
+ physOps.add(pop);
+ } catch (OptimizerException e) {
+ e.printStackTrace();
+ } catch (Throwable throwable) {
+ throwable.printStackTrace();
+ }
}
- @Override
- public void init(DrillConfig config) {
+ PlanProperties props = new PlanProperties();
+ props.type = PlanProperties.PlanType.APACHE_DRILL_PHYSICAL;
+ props.version = plan.getProperties().version;
+ props.generator = plan.getProperties().generator;
+ PhysicalPlan p = new PhysicalPlan(props, physOps);
+ return p;
+ //return new PhysicalPlan(props, physOps);
+ }
- }
+ @Override
+ public void close() {
- @Override
- public PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) {
- Object obj = new Object();
- Collection<SinkOperator> roots = plan.getGraph().getRoots();
- List<PhysicalOperator> physOps = new ArrayList<PhysicalOperator>(roots.size());
- LogicalConverter converter = new LogicalConverter();
- for ( SinkOperator op : roots){
- try {
- PhysicalOperator pop = op.accept(converter, obj);
- System.out.println(pop);
- physOps.add(pop);
- } catch (OptimizerException e) {
- e.printStackTrace();
- } catch (Throwable throwable) {
- throwable.printStackTrace();
- }
- }
+ }
- PlanProperties props = new PlanProperties();
- props.type = PlanProperties.PlanType.APACHE_DRILL_PHYSICAL;
- props.version = plan.getProperties().version;
- props.generator = plan.getProperties().generator;
- return new PhysicalPlan(props, physOps);
- }
+ public static class BasicOptimizationContext implements OptimizationContext {
@Override
- public void close() {
-
+ public int getPriority() {
+ return 1;
}
+ }
- public static class BasicOptimizationContext implements OptimizationContext {
+ private class LogicalConverter extends AbstractLogicalVisitor<PhysicalOperator, Object, OptimizerException> {
- @Override
- public int getPriority() {
- return 1;
- }
+ // storing a reference to the plan for access to other elements outside of the query graph
+ // such as the storage engine configs
+ LogicalPlan logicalPlan;
+
+ public LogicalConverter(LogicalPlan logicalPlan){
+ this.logicalPlan = logicalPlan;
}
- private class LogicalConverter extends AbstractLogicalVisitor<PhysicalOperator, Object, OptimizerException> {
-
- @Override
- public MockScanPOP visitScan(Scan scan, Object obj) throws OptimizerException {
- List<MockScanPOP.MockScanEntry> myObjects;
-
- try {
- if ( scan.getStorageEngine().equals("local-logs")){
- myObjects = scan.getSelection().getListWith(config,
- new TypeReference<ArrayList<MockScanPOP.MockScanEntry>>() {
- });
- }
- else{
- myObjects = new ArrayList<>();
- MockScanPOP.MockColumn[] cols = {
- new MockScanPOP.MockColumn("RED", MinorType.BIGINT, DataMode.REQUIRED, null, null, null),
- new MockScanPOP.MockColumn("GREEN", MinorType.BIGINT, DataMode.REQUIRED,null, null, null)
- };
- myObjects.add(new MockScanPOP.MockScanEntry(100, cols));
- }
- } catch (IOException e) {
- e.printStackTrace();
- throw new OptimizerException("Error reading selection attribute of Scan node in Logical to Physical plan conversion.");
- }
-
- return new MockScanPOP("http://apache.org", myObjects);
- }
+ @Override
+ public PhysicalOperator visitScan(Scan scan, Object obj) throws OptimizerException {
+ List<MockGroupScanPOP.MockScanEntry> myObjects;
- @Override
- public Screen visitStore(Store store, Object obj) throws OptimizerException {
- if ( ! store.iterator().hasNext()){
- throw new OptimizerException("Store node in logical plan does not have a child.");
- }
- return new Screen(store.iterator().next().accept(this, obj), context.getCurrentEndpoint());
+ try {
+ if (scan.getStorageEngine().equals("parquet")) {
+ return context.getStorageEngine(logicalPlan.getStorageEngineConfig(scan.getStorageEngine())).getPhysicalScan(scan);
}
-
- @Override
- public PhysicalOperator visitProject(Project project, Object obj) throws OptimizerException {
- return project.getInput().accept(this, obj);
-// return new org.apache.drill.exec.physical.config.Project(
-// Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj));
+ if (scan.getStorageEngine().equals("local-logs")) {
+ myObjects = scan.getSelection().getListWith(config,
+ new TypeReference<ArrayList<MockGroupScanPOP.MockScanEntry>>() {
+ });
+ } else {
+ myObjects = new ArrayList<>();
+ MockGroupScanPOP.MockColumn[] cols = {
+ new MockGroupScanPOP.MockColumn("blah", MinorType.INT, DataMode.REQUIRED, 4, 4, 4),
+ new MockGroupScanPOP.MockColumn("blah_2", MinorType.INT, DataMode.REQUIRED, 4, 4, 4) };
+ myObjects.add(new MockGroupScanPOP.MockScanEntry(50, cols));
}
+ } catch (IOException e) {
+ throw new OptimizerException(
+ "Error reading selection attribute of GroupScan node in Logical to Physical plan conversion.", e);
+ } catch (SetupException e) {
+ throw new OptimizerException(
+ "Storage engine not found: " + scan.getStorageEngine(), e);
+ }
+
+ return new MockGroupScanPOP("http://apache.org", myObjects);
+ }
- @Override
- public PhysicalOperator visitFilter(Filter filter, Object obj) throws OptimizerException {
- TypeProtos.MajorType.Builder b = TypeProtos.MajorType.getDefaultInstance().newBuilderForType();
- b.setMode(DataMode.REQUIRED);
- b.setMinorType(MinorType.BIGINT);
-
- return new SelectionVectorRemover(new org.apache.drill.exec.physical.config.Filter(
- filter.iterator().next().accept(this, obj), /*filter.getExpr() */
- new FunctionCall(FunctionDefinition.simple("alternate", new NoArgValidator(),
- new OutputTypeDeterminer.FixedType(b.build())), null, new ExpressionPosition("asdf", 1)),
- 1.0f));
+ @Override
+ public PhysicalOperator visitStore(Store store, Object obj) throws OptimizerException {
+ if (!store.iterator().hasNext()) {
+ throw new OptimizerException("Store node in logical plan does not have a child.");
}
+ return new Screen(store.iterator().next().accept(this, obj), context.getCurrentEndpoint());
+ }
+
+ @Override
+ public PhysicalOperator visitProject(Project project, Object obj) throws OptimizerException {
+ return project.getInput().accept(this, obj);
+ // return new org.apache.drill.exec.physical.config.Project(
+ // Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj));
+ }
+
+ @Override
+ public PhysicalOperator visitFilter(Filter filter, Object obj) throws OptimizerException {
+ TypeProtos.MajorType.Builder b = TypeProtos.MajorType.getDefaultInstance().newBuilderForType();
+ b.setMode(DataMode.REQUIRED);
+ b.setMinorType(MinorType.BIGINT);
+
+ return new SelectionVectorRemover(new org.apache.drill.exec.physical.config.Filter(filter.iterator().next()
+ .accept(this, obj), /* filter.getExpr() */
+ new FunctionCall(FunctionDefinition.simple("alternate", new NoArgValidator(), new OutputTypeDeterminer.FixedType(
+ b.build())), null, new ExpressionPosition("asdf", 1)), 1.0f));
}
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryFromHDFS.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryFromHDFS.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryFromHDFS.java
new file mode 100644
index 0000000..b90f1bc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryFromHDFS.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.exec.physical.base.Size;
+
+public class ReadEntryFromHDFS extends ReadEntryWithPath {
+
+ private long start;
+ private long length;
+
+ @JsonCreator
+ public ReadEntryFromHDFS(@JsonProperty("path") String path,@JsonProperty("start") long start, @JsonProperty("length") long length) {
+ this.path = path;
+ this.start = start;
+ this.length = length;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1, 2, 1, 1);
+ }
+
+ @Override
+ public Size getSize() {
+ // TODO - these values are wrong, I cannot know these until after I read a file
+ return new Size(10, 10);
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public long getLength() {
+ return length;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryWithPath.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryWithPath.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryWithPath.java
new file mode 100644
index 0000000..57d1d0b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntryWithPath.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import org.apache.drill.exec.physical.base.Size;
+
+public class ReadEntryWithPath implements ReadEntry {
+
+ protected String path;
+
+ public String getPath(){
+ return path;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " +
+ "selections inside a scan node from a logical plan, it cannot be used in an executing plan and has no cost.");
+ }
+
+ @Override
+ public Size getSize() {
+ throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " +
+ "selections on a scan node from a logical plan, it cannot be used in an executing plan and has no size.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
new file mode 100644
index 0000000..9691f08
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.physical.ReadEntry;
+
+import com.google.common.collect.Iterators;
+
+public abstract class AbstractGroupScan extends AbstractBase implements GroupScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractGroupScan.class);
+
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Iterators.emptyIterator();
+ }
+
+ @Override
+ public boolean isExecutable() {
+ return false;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+ return physicalVisitor.visitGroupScan(this, value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index f782325..3b58803 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -62,8 +62,13 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
}
@Override
- public T visitScan(Scan<?> scan, X value) throws E{
- return visitOp(scan, value);
+ public T visitGroupScan(GroupScan groupScan, X value) throws E{
+ return visitOp(groupScan, value);
+ }
+
+ @Override
+ public T visitSubScan(SubScan subScan, X value) throws E{
+ return visitOp(subScan, value);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java
deleted file mode 100644
index dbde9c5..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractScan.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.base;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Iterators;
-
-public abstract class AbstractScan<R extends ReadEntry> extends AbstractBase implements Scan<R>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractScan.class);
-
- protected final List<R> readEntries;
- private final OperatorCost cost;
- private final Size size;
-
- public AbstractScan(List<R> readEntries) {
- this.readEntries = readEntries;
- OperatorCost cost = new OperatorCost(0,0,0,0);
- Size size = new Size(0,0);
- for(R r : readEntries){
- cost = cost.add(r.getCost());
- size = size.add(r.getSize());
- }
- this.cost = cost;
- this.size = size;
- }
-
- @Override
- @JsonProperty("entries")
- public List<R> getReadEntries() {
- return readEntries;
- }
-
- @Override
- public Iterator<PhysicalOperator> iterator() {
- return Iterators.emptyIterator();
- }
-
- @Override
- public boolean isExecutable() {
- return true;
- }
-
- @Override
- public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
- return physicalVisitor.visitScan(this, value);
- }
-
- @Override
- public OperatorCost getCost() {
- return cost;
- }
-
- @Override
- public Size getSize() {
- return size;
- }
-
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
new file mode 100644
index 0000000..acafd6c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public interface GroupScan extends Scan, HasAffinity{
+
+ public abstract void applyAssignments(List<DrillbitEndpoint> endpoints);
+
+ public abstract SubScan getSpecificScan(int minorFragmentId);
+
+ @JsonIgnore
+ public int getMaxParallelizationWidth();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java
index d4ed456..7764739 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Leaf.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.base;
/**
* An operator which specifically is a lowest level leaf node of a query plan across all possible fragments. Currently, the only operator that is a Leaf
- * node are Scan nodes. Ultimately this could include use of Cache scans and other types of atypical data production systems.
+ * node are GroupScan nodes. Ultimately this could include use of Cache scans and other types of atypical data production systems.
*/
public interface Leaf extends FragmentLeaf {
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index d412c2d..c24836b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -34,7 +34,7 @@ import com.fasterxml.jackson.annotation.ObjectIdGenerators;
@JsonPropertyOrder({ "@id" })
@JsonIdentityInfo(generator = ObjectIdGenerators.IntSequenceGenerator.class, property = "@id")
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "pop")
-public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
+public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
/**
* Get the cost of execution of this particular operator.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index f36633f..8e09e3a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -39,7 +39,8 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitExchange(Exchange exchange, EXTRA value) throws EXCEP;
- public RETURN visitScan(Scan<?> scan, EXTRA value) throws EXCEP;
+ public RETURN visitGroupScan(GroupScan groupScan, EXTRA value) throws EXCEP;
+ public RETURN visitSubScan(SubScan subScan, EXTRA value) throws EXCEP;
public RETURN visitStore(Store store, EXTRA value) throws EXCEP;
public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java
index 2207f79..f56e9f9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Scan.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,20 +17,7 @@
******************************************************************************/
package org.apache.drill.exec.physical.base;
-import java.util.List;
-
import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public interface Scan<R extends ReadEntry> extends Leaf, HasAffinity{
-
- @JsonProperty("entries")
- public abstract List<R> getReadEntries();
-
- public abstract void applyAssignments(List<DrillbitEndpoint> endpoints);
-
- public abstract Scan<?> getSpecificScan(int minorFragmentId);
-}
\ No newline at end of file
+public interface Scan extends Leaf {
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
new file mode 100644
index 0000000..f75ba19
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SubScan.java
@@ -0,0 +1,23 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.exec.physical.ReadEntry;
+
+public interface SubScan extends Scan {
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java
new file mode 100644
index 0000000..a28c7d8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockGroupScanPOP.java
@@ -0,0 +1,221 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.vector.TypeHelper;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("mock-scan")
+public class MockGroupScanPOP extends AbstractGroupScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class);
+
+ private final String url;
+ protected final List<MockScanEntry> readEntries;
+ private final OperatorCost cost;
+ private final Size size;
+ private LinkedList<MockScanEntry>[] mappings;
+
+ @JsonCreator
+ public MockGroupScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
+ this.readEntries = readEntries;
+ OperatorCost cost = new OperatorCost(0,0,0,0);
+ Size size = new Size(0,0);
+ for(MockScanEntry r : readEntries){
+ cost = cost.add(r.getCost());
+ size = size.add(r.getSize());
+ }
+ this.cost = cost;
+ this.size = size;
+ this.url = url;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ @JsonProperty("entries")
+ public List<MockScanEntry> getReadEntries() {
+ return readEntries;
+ }
+
+ public static class MockScanEntry implements ReadEntry {
+
+ private final int records;
+ private final MockColumn[] types;
+ private final int recordSize;
+
+
+ @JsonCreator
+ public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) {
+ this.records = records;
+ this.types = types;
+ int size = 0;
+ for(MockColumn dt : types){
+ size += TypeHelper.getSize(dt.getMajorType());
+ }
+ this.recordSize = size;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1, 2, 1, 1);
+ }
+
+ public int getRecords() {
+ return records;
+ }
+
+ public MockColumn[] getTypes() {
+ return types;
+ }
+
+ @Override
+ public Size getSize() {
+ return new Size(records, recordSize);
+ }
+ }
+
+ @JsonInclude(Include.NON_NULL)
+ public static class MockColumn{
+ @JsonProperty("type") public MinorType minorType;
+ public String name;
+ public DataMode mode;
+ public Integer width;
+ public Integer precision;
+ public Integer scale;
+
+
+ @JsonCreator
+ public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
+ this.name = name;
+ this.minorType = minorType;
+ this.mode = mode;
+ this.width = width;
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @JsonProperty("type")
+ public MinorType getMinorType() {
+ return minorType;
+ }
+ public String getName() {
+ return name;
+ }
+ public DataMode getMode() {
+ return mode;
+ }
+ public Integer getWidth() {
+ return width;
+ }
+ public Integer getPrecision() {
+ return precision;
+ }
+ public Integer getScale() {
+ return scale;
+ }
+
+ @JsonIgnore
+ public MajorType getMajorType(){
+ MajorType.Builder b = MajorType.newBuilder();
+ b.setMode(mode);
+ b.setMinorType(minorType);
+ if(precision != null) b.setPrecision(precision);
+ if(width != null) b.setWidth(width);
+ if(scale != null) b.setScale(scale);
+ return b.build();
+ }
+
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ return Collections.emptyList();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+ Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
+
+ mappings = new LinkedList[endpoints.size()];
+
+ int i =0;
+ for(MockScanEntry e : this.getReadEntries()){
+ if(i == endpoints.size()) i -= endpoints.size();
+ LinkedList<MockScanEntry> entries = mappings[i];
+ if(entries == null){
+ entries = new LinkedList<MockScanEntry>();
+ mappings[i] = entries;
+ }
+ entries.add(e);
+ i++;
+ }
+ }
+
+ @Override
+ public SubScan getSpecificScan(int minorFragmentId) {
+ assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId);
+ return new MockSubScanPOP(url, mappings[minorFragmentId]);
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return readEntries.size();
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return cost;
+ }
+
+ @Override
+ public Size getSize() {
+ return size;
+ }
+
+ @Override
+ @JsonIgnore
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new MockGroupScanPOP(url, readEntries);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
index 11b9243..bd57823 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockRecordReader.java
@@ -23,8 +23,8 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.MockScanPOP.MockColumn;
-import org.apache.drill.exec.physical.config.MockScanPOP.MockScanEntry;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockColumn;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockScanEntry;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.RecordReader;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
index bfc19af..a06aaee 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanBatchCreator.java
@@ -21,7 +21,7 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.config.MockScanPOP.MockScanEntry;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockScanEntry;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
@@ -30,11 +30,11 @@ import org.apache.drill.exec.store.RecordReader;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-public class MockScanBatchCreator implements BatchCreator<MockScanPOP>{
+public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, MockScanPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ public RecordBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<MockScanEntry> entries = config.getReadEntries();
List<RecordReader> readers = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
deleted file mode 100644
index 151d541..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockScanPOP.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.base.AbstractScan;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Scan;
-import org.apache.drill.exec.physical.base.Size;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.vector.TypeHelper;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-
-@JsonTypeName("mock-scan")
-public class MockScanPOP extends AbstractScan<MockScanPOP.MockScanEntry> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanPOP.class);
-
- private final String url;
- private LinkedList<MockScanEntry>[] mappings;
-
- @JsonCreator
- public MockScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockScanEntry> readEntries) {
- super(readEntries);
- this.url = url;
- }
-
- public String getUrl() {
- return url;
- }
-
-
- public static class MockScanEntry implements ReadEntry {
-
- private final int records;
- private final MockColumn[] types;
- private final int recordSize;
-
-
- @JsonCreator
- public MockScanEntry(@JsonProperty("records") int records, @JsonProperty("types") MockColumn[] types) {
- this.records = records;
- this.types = types;
- int size = 0;
- for(MockColumn dt : types){
- size += TypeHelper.getSize(dt.getMajorType());
- }
- this.recordSize = size;
- }
-
- @Override
- public OperatorCost getCost() {
- return new OperatorCost(1, 2, 1, 1);
- }
-
-
- public int getRecords() {
- return records;
- }
-
- public MockColumn[] getTypes() {
- return types;
- }
-
- @Override
- public Size getSize() {
- return new Size(records, recordSize);
- }
- }
-
- @JsonInclude(Include.NON_NULL)
- public static class MockColumn{
- @JsonProperty("type") public MinorType minorType;
- public String name;
- public DataMode mode;
- public Integer width;
- public Integer precision;
- public Integer scale;
-
-
- @JsonCreator
- public MockColumn(@JsonProperty("name") String name, @JsonProperty("type") MinorType minorType, @JsonProperty("mode") DataMode mode, @JsonProperty("width") Integer width, @JsonProperty("precision") Integer precision, @JsonProperty("scale") Integer scale) {
- this.name = name;
- this.minorType = minorType;
- this.mode = mode;
- this.width = width;
- this.precision = precision;
- this.scale = scale;
- }
-
- @JsonProperty("type")
- public MinorType getMinorType() {
- return minorType;
- }
- public String getName() {
- return name;
- }
- public DataMode getMode() {
- return mode;
- }
- public Integer getWidth() {
- return width;
- }
- public Integer getPrecision() {
- return precision;
- }
- public Integer getScale() {
- return scale;
- }
-
- @JsonIgnore
- public MajorType getMajorType(){
- MajorType.Builder b = MajorType.newBuilder();
- b.setMode(mode);
- b.setMinorType(minorType);
- if(precision != null) b.setPrecision(precision);
- if(width != null) b.setWidth(width);
- if(scale != null) b.setScale(scale);
- return b.build();
- }
-
- }
-
- @Override
- public List<EndpointAffinity> getOperatorAffinity() {
- return Collections.emptyList();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void applyAssignments(List<DrillbitEndpoint> endpoints) {
- Preconditions.checkArgument(endpoints.size() <= getReadEntries().size());
-
- mappings = new LinkedList[endpoints.size()];
-
- int i =0;
- for(MockScanEntry e : this.getReadEntries()){
- if(i == endpoints.size()) i -= endpoints.size();
- LinkedList<MockScanEntry> entries = mappings[i];
- if(entries == null){
- entries = new LinkedList<MockScanEntry>();
- mappings[i] = entries;
- }
- entries.add(e);
- i++;
- }
- }
-
- @Override
- public Scan<?> getSpecificScan(int minorFragmentId) {
- assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId);
- return new MockScanPOP(url, mappings[minorFragmentId]);
- }
-
- @Override
- @JsonIgnore
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
- Preconditions.checkArgument(children.isEmpty());
- return new MockScanPOP(url, readEntries);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
index 0044628..6348686 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
@@ -22,11 +22,10 @@ import java.util.Collection;
import org.apache.drill.common.logical.data.Scan;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.ReadEntry;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.AbstractStorageEngine;
import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.StorageEngine;
-import org.apache.drill.exec.store.StorageEngine.ReadEntry;
import com.google.common.collect.ListMultimap;
@@ -39,11 +38,6 @@ public class MockStorageEngine extends AbstractStorageEngine{
}
@Override
- public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
- return null;
- }
-
- @Override
public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java
new file mode 100644
index 0000000..7380617
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockSubScanPOP.java
@@ -0,0 +1,115 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.common.collect.Iterators;
+import org.apache.drill.common.graph.GraphVisitor;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.vector.TypeHelper;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("mock-sub-scan")
+public class MockSubScanPOP extends AbstractBase implements SubScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockGroupScanPOP.class);
+
+ private final String url;
+ protected final List<MockGroupScanPOP.MockScanEntry> readEntries;
+ private final OperatorCost cost;
+ private final Size size;
+ private LinkedList<MockGroupScanPOP.MockScanEntry>[] mappings;
+
+ @JsonCreator
+ public MockSubScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockGroupScanPOP.MockScanEntry> readEntries) {
+ this.readEntries = readEntries;
+ OperatorCost cost = new OperatorCost(0,0,0,0);
+ Size size = new Size(0,0);
+ for(MockGroupScanPOP.MockScanEntry r : readEntries){
+ cost = cost.add(r.getCost());
+ size = size.add(r.getSize());
+ }
+ this.cost = cost;
+ this.size = size;
+ this.url = url;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ @JsonProperty("entries")
+ public List<MockGroupScanPOP.MockScanEntry> getReadEntries() {
+ return readEntries;
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Iterators.emptyIterator();
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Size getSize() {
+ throw new UnsupportedOperationException();
+ }
+
+ // will want to replace these two methods with an interface above for AbstractSubScan
+ @Override
+ public boolean isExecutable() {
+ return true; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+ return physicalVisitor.visitSubScan(this, value);
+ }
+ // see comment above about replacing this
+
+ @Override
+ @JsonIgnore
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new MockSubScanPOP(url, readEntries);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 1c15289..61c9383 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,6 +26,15 @@ import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.physical.config.MockScanBatchCreator;
+import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.base.*;
import org.apache.drill.exec.physical.config.*;
import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
@@ -35,11 +44,15 @@ import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
private MockScanBatchCreator msc = new MockScanBatchCreator();
+ private ParquetScanBatchCreator parquetScan = new ParquetScanBatchCreator();
private ScreenCreator sc = new ScreenCreator();
private RandomReceiverCreator rrc = new RandomReceiverCreator();
private SingleSenderCreator ssc = new SingleSenderCreator();
@@ -48,22 +61,25 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
private SVRemoverCreator svc = new SVRemoverCreator();
private SortBatchCreator sbc = new SortBatchCreator();
private RootExec root = null;
-
+
private ImplCreator(){}
-
+
public RootExec getRoot(){
return root;
}
-
+
@Override
public RecordBatch visitProject(Project op, FragmentContext context) throws ExecutionSetupException {
return pbc.getBatch(context, op, getChildren(op, context));
}
@Override
- public RecordBatch visitScan(Scan<?> scan, FragmentContext context) throws ExecutionSetupException {
- Preconditions.checkNotNull(scan);
+ public RecordBatch visitSubScan(SubScan subScan, FragmentContext context) throws ExecutionSetupException {
+ Preconditions.checkNotNull(subScan);
Preconditions.checkNotNull(context);
+
+ if(subScan instanceof MockSubScanPOP){
+ return msc.getBatch(context, (MockSubScanPOP) subScan, Collections.<RecordBatch> emptyList());
if(scan instanceof MockScanPOP){
return msc.getBatch(context, (MockScanPOP) scan, Collections.<RecordBatch>emptyList());
@@ -72,16 +88,21 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
}else{
return super.visitScan(scan, context);
}
-
+ else if (subScan instanceof ParquetRowGroupScan){
+ return parquetScan.getBatch(context, (ParquetRowGroupScan) subScan, Collections.<RecordBatch> emptyList());
+ }
+ else{
+ return super.visitSubScan(subScan, context);
+ }
+
}
-
@Override
public RecordBatch visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException {
if(op instanceof SelectionVectorRemover){
return svc.getBatch(context, (SelectionVectorRemover) op, getChildren(op, context));
}else{
- return super.visitOp(op, context);
+ return super.visitOp(op, context);
}
}
@@ -97,7 +118,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
root = sc.getRoot(context, op, getChildren(op, context));
return null;
}
-
+
@Override
public RecordBatch visitFilter(Filter filter, FragmentContext context) throws ExecutionSetupException {
return fbc.getBatch(context, filter, getChildren(filter, context));
@@ -121,7 +142,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
}
return children;
}
-
+
public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException{
ImplCreator i = new ImplCreator();
root.accept(i, context);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index 7e72683..3e9f1e2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -24,5 +24,6 @@ import org.apache.drill.exec.vector.ValueVector;
public interface OutputMutator {
public void removeField(MaterializedField field) throws SchemaChangeException;
public void addField(ValueVector vector) throws SchemaChangeException ;
+ public void removeAllFields();
public void setNewSchema() throws SchemaChangeException;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 5a543b0..4227450 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -161,6 +161,12 @@ public class ScanBatch implements RecordBatch {
}
@Override
+ public void removeAllFields() {
+ holder.clear();
+ fieldVectorMap.clear();
+ }
+
+ @Override
public void setNewSchema() throws SchemaChangeException {
ScanBatch.this.schema = this.builder.build();
ScanBatch.this.schemaChanged = true;