You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/02 05:03:12 UTC
[12/13] Merge Jason's SQL updates to work with full exec. Random
vector updates including changing to copyFrom Fixes to writable batch. Add an
alternative ByteBuf implementation that leverages little endianness.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index dcaf823..289ec4b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -2,35 +2,29 @@ package org.apache.drill.exec.opt;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.drill.common.PlanProperties;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.*;
import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.data.*;
+import org.apache.drill.common.logical.data.Filter;
import org.apache.drill.common.logical.data.Project;
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.common.logical.data.SinkOperator;
-import org.apache.drill.common.logical.data.Store;
import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.exception.OptimizerException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.MockScanPOP;
-import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.*;
import com.fasterxml.jackson.core.type.TypeReference;
-/**
- * Created with IntelliJ IDEA.
- * User: jaltekruse
- * Date: 6/11/13
- * Time: 5:32 PM
- * To change this template use File | Settings | File Templates.
- */
public class BasicOptimizer extends Optimizer{
private DrillConfig config;
@@ -58,9 +52,9 @@ public class BasicOptimizer extends Optimizer{
System.out.println(pop);
physOps.add(pop);
} catch (OptimizerException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
} catch (Throwable throwable) {
- throwable.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ throwable.printStackTrace();
}
}
@@ -99,9 +93,11 @@ public class BasicOptimizer extends Optimizer{
}
else{
myObjects = new ArrayList<>();
- MockScanPOP.MockColumn[] cols = { new MockScanPOP.MockColumn("blah", MinorType.INT, DataMode.REQUIRED,4,4,4),
- new MockScanPOP.MockColumn("blah_2", MinorType.INT, DataMode.REQUIRED,4,4,4)};
- myObjects.add(new MockScanPOP.MockScanEntry(50, cols));
+ MockScanPOP.MockColumn[] cols = {
+ new MockScanPOP.MockColumn("RED", MinorType.BIGINT, DataMode.REQUIRED, null, null, null),
+ new MockScanPOP.MockColumn("GREEN", MinorType.BIGINT, DataMode.REQUIRED,null, null, null)
+ };
+ myObjects.add(new MockScanPOP.MockScanEntry(100, cols));
}
} catch (IOException e) {
e.printStackTrace();
@@ -121,7 +117,22 @@ public class BasicOptimizer extends Optimizer{
@Override
public PhysicalOperator visitProject(Project project, Object obj) throws OptimizerException {
- return project.getInput().accept(this, obj);
+ return project.getInput().accept(this, obj);
+// return new org.apache.drill.exec.physical.config.Project(
+// Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj));
}
+
+ @Override
+ public PhysicalOperator visitFilter(Filter filter, Object obj) throws OptimizerException {
+ TypeProtos.MajorType.Builder b = TypeProtos.MajorType.getDefaultInstance().newBuilderForType();
+ b.setMode(DataMode.REQUIRED);
+ b.setMinorType(MinorType.BIGINT);
+
+ return new SelectionVectorRemover(new org.apache.drill.exec.physical.config.Filter(
+ filter.iterator().next().accept(this, obj), /*filter.getExpr() */
+ new FunctionCall(FunctionDefinition.simple("alternate", new NoArgValidator(),
+ new OutputTypeDeterminer.FixedType(b.build())), null, new ExpressionPosition("asdf", 1)),
+ 1.0f));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
index bf3cd91..18aa484 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
@@ -55,15 +55,15 @@ public abstract class FilterTemplate implements Filterer{
private void filterBatchSV2(int recordCount){
int svIndex = 0;
- final int count = recordCount*2;
- for(int i = 0; i < count; i+=2){
+ final int count = recordCount;
+ for(int i = 0; i < count; i++){
char index = incomingSelectionVector.getIndex(i);
if(doEval(i, 0)){
outgoingSelectionVector.setIndex(svIndex, index);
- svIndex+=2;
+ svIndex++;
}
}
- outgoingSelectionVector.setRecordCount(svIndex/2);
+ outgoingSelectionVector.setRecordCount(svIndex);
}
private void filterBatchNoSV(int recordCount){
@@ -72,10 +72,10 @@ public abstract class FilterTemplate implements Filterer{
if(doEval(i, 0)){
outgoingSelectionVector.setIndex(svIndex, i);
- svIndex+=2;
+ svIndex++;
}
}
- outgoingSelectionVector.setRecordCount(svIndex/2);
+ outgoingSelectionVector.setRecordCount(svIndex);
}
protected abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
index 7929296..85d7991 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -43,6 +43,7 @@ public class VectorRecordMaterializer implements RecordMaterializer{
}
public QueryWritableBatch convertNext(boolean isLast) {
+ //batch.getWritableBatch().getDef().getRecordCount()
WritableBatch w = batch.getWritableBatch();
QueryResult header = QueryResult.newBuilder() //
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 8d47678..5f15c2d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -31,8 +31,8 @@ public abstract class ProjectorTemplate implements Projector {
case TWO_BYTE:
- final int count = recordCount*2;
- for(int i = 0; i < count; i+=2, firstOutputIndex++){
+ final int count = recordCount;
+ for(int i = 0; i < count; i++, firstOutputIndex++){
doEval(vector2.getIndex(i), firstOutputIndex);
}
return recordCount;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
index 12a1e0a..6a0e2c3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
@@ -31,7 +31,7 @@ public abstract class CopierTemplate implements Copier{
allocateVectors(recordCount);
int outgoingPosition = 0;
- for(int svIndex = 0; svIndex < recordCount * 2; svIndex++, outgoingPosition++){
+ for(int svIndex = 0; svIndex < recordCount; svIndex++, outgoingPosition++){
doEval(svIndex, outgoingPosition);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 7b90717..68793b0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -40,7 +40,7 @@ public class RemovingRecordBatch implements RecordBatch{
private Copier copier;
private List<ValueVector> outputVectors;
private VectorHolder vh;
-
+ private int recordCount;
public RemovingRecordBatch(RecordBatch incoming, FragmentContext context){
this.incoming = incoming;
@@ -65,7 +65,7 @@ public class RemovingRecordBatch implements RecordBatch{
@Override
public int getRecordCount() {
- return incoming.getRecordCount();
+ return recordCount;
}
@Override
@@ -95,7 +95,7 @@ public class RemovingRecordBatch implements RecordBatch{
@Override
public IterOutcome next() {
-
+ recordCount = 0;
IterOutcome upstream = incoming.next();
logger.debug("Upstream... {}", upstream);
switch(upstream){
@@ -114,7 +114,7 @@ public class RemovingRecordBatch implements RecordBatch{
}
// fall through.
case OK:
- int recordCount = incoming.getRecordCount();
+ recordCount = incoming.getRecordCount();
copier.copyRecords();
for(ValueVector v : this.outputVectors){
ValueVector.Mutator m = v.getMutator();
@@ -195,7 +195,7 @@ public class RemovingRecordBatch implements RecordBatch{
this.vh = new VectorHolder(outputVectors);
SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.NONE);
- for(ValueVector v : outputVectors){
+ for(ValueVector v : incoming){
bldr.addField(v.getField());
}
this.outSchema = bldr.build();
@@ -225,7 +225,7 @@ public class RemovingRecordBatch implements RecordBatch{
JVar inVV = declareVVSetup("incoming", g, fieldId, vvClass);
JVar outVV = declareVVSetup("outgoing", g, fieldId, vvClass);
- g.getBlock().add(inVV.invoke("copyValue").arg(inIndex).arg(outIndex).arg(outVV));
+ g.getBlock().add(outVV.invoke("copyFrom").arg(inIndex).arg(outIndex).arg(inVV));
fieldId++;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index e7f381f..685cc77 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -61,8 +61,13 @@ public class WritableBatch {
List<ByteBuf> buffers = Lists.newArrayList();
List<FieldMetadata> metadata = Lists.newArrayList();
+
for (ValueVector vv : vectors) {
metadata.add(vv.getMetadata());
+
+ // don't try to get the buffers if we don't have any records. It is possible the buffers are dead buffers.
+ if(recordCount == 0) continue;
+
for (ByteBuf b : vv.getBuffers()) {
buffers.add(b);
b.retain();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 33938cc..a38c8e5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -43,12 +43,12 @@ public class SelectionVector2 implements Closeable{
return recordCount;
}
- public char getIndex(int directIndex){
- return buffer.getChar(directIndex);
+ public char getIndex(int index){
+ return buffer.getChar(index);
}
- public void setIndex(int directIndex, char value){
- buffer.setChar(directIndex, value);
+ public void setIndex(int index, char value){
+ buffer.setChar(index*2, value);
}
public void allocateNew(int size){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index ad44ff2..40b8edc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -45,8 +45,12 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
}
- public void submitQuery(UserResultsListener resultsListener, RunQuery query) throws RpcException {
- send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
+ public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
+ try{
+ send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
+ }catch(RpcException ex){
+ resultsListener.submissionFailed(ex);
+ }
}
public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint) throws RpcException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 1376b7d..910b80e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -57,8 +57,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
return len;
}
- public void copyValue(int inIndex, int outIndex, BitVector target) {
- target.mutator.set(outIndex, this.accessor.get(inIndex));
+ public void copyFrom(int inIndex, int outIndex, BitVector from) {
+ this.mutator.set(outIndex, from.accessor.get(inIndex));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index a90382a..0a4614a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -51,6 +51,7 @@ import org.apache.drill.exec.util.AtomicState;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
/**
@@ -163,7 +164,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
private void parseAndRunLogicalPlan(String json) {
try {
LogicalPlan logicalPlan = context.getPlanReader().readLogicalPlan(json);
+ logger.debug("Logical {}", logicalPlan.unparse(DrillConfig.create()));
PhysicalPlan physicalPlan = convert(logicalPlan);
+ logger.debug("Physical {}", new ObjectMapper().writeValueAsString(physicalPlan));
runPhysicalPlan(physicalPlan);
} catch (IOException e) {
fail("Failure while parsing logical plan.", e);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
new file mode 100644
index 0000000..155b8da
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
@@ -0,0 +1,24 @@
+package org.apache.drill.exec.memory;
+
+import static org.junit.Assert.assertEquals;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.memory.DirectBufferAllocator;
+import org.junit.Test;
+
+
+
+public class TestEndianess {
+
+ @Test
+ public void testLittleEndian(){
+ DirectBufferAllocator a = new DirectBufferAllocator();
+ ByteBuf b = a.buffer(4);
+ b.setInt(0, 35);
+ assertEquals((int) b.getByte(0), 35);
+ assertEquals((int) b.getByte(1), 0);
+ assertEquals((int) b.getByte(2), 0);
+ assertEquals((int) b.getByte(3), 0);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/pom.xml b/sandbox/prototype/exec/pom.xml
index df02dea..8c54a19 100644
--- a/sandbox/prototype/exec/pom.xml
+++ b/sandbox/prototype/exec/pom.xml
@@ -15,6 +15,7 @@
<dependencies>
</dependencies>
<modules>
+ <module>bufferl</module>
<module>java-exec</module>
<module>ref</module>
</modules>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/pom.xml b/sandbox/prototype/sqlparser/pom.xml
index 3309d36..9be576e 100644
--- a/sandbox/prototype/sqlparser/pom.xml
+++ b/sandbox/prototype/sqlparser/pom.xml
@@ -26,7 +26,7 @@
</repositories>
<dependencies>
- <dependency>
+ <dependency>
<groupId>net.hydromatic</groupId>
<artifactId>optiq</artifactId>
<version>0.4.2</version>
@@ -34,13 +34,32 @@
<dependency>
<groupId>net.hydromatic</groupId>
<artifactId>linq4j</artifactId>
- <version>0.1.3</version>
+ <version>0.1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>common</artifactId>
+ <version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.drill.exec</groupId>
<artifactId>ref</artifactId>
<version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>java-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>java-exec</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
</dependency>
+
+
<dependency>
<groupId>org.apache.drill.exec</groupId>
<artifactId>ref</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java
index 23d7237..602a55d 100644
--- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java
@@ -17,6 +17,7 @@
******************************************************************************/
package org.apache.drill.jdbc;
+import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Map;
@@ -30,46 +31,73 @@ import net.hydromatic.linq4j.expressions.MethodCallExpression;
import net.hydromatic.optiq.*;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.ref.rops.DataWriter;
import org.apache.drill.exec.ref.rse.ClasspathRSE;
import org.apache.drill.exec.ref.rse.ClasspathRSE.ClasspathInputConfig;
-import org.apache.drill.optiq.*;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.optiq.DrillRel;
+import org.apache.drill.optiq.DrillScan;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptTable;
import org.eigenbase.reltype.RelDataType;
import org.eigenbase.reltype.RelDataTypeFactory;
import org.eigenbase.sql.type.SqlTypeName;
-/** Optiq Table used by Drill. */
+/**
+ * Optiq Table used by Drill.
+ */
public class DrillTable extends BaseQueryable<Object>
- implements TranslatableTable<Object>
-{
+ implements TranslatableTable<Object> {
private final Schema schema;
private final String name;
private final String storageEngineName;
private final RelDataType rowType;
public final StorageEngineConfig storageEngineConfig;
public final Object selection;
+ private boolean useReferenceInterpreter;
- /** Creates a DrillTable. */
+ // full engine connection information
+ public Drillbit bit;
+ public DrillClient client;
+
+ /**
+ * Creates a DrillTable.
+ */
public DrillTable(Schema schema,
- Type elementType,
- Expression expression,
- RelDataType rowType,
- String name,
- StorageEngineConfig storageEngineConfig,
- Object selection,
- String storageEngineName
- ) {
+ Type elementType,
+ Expression expression,
+ RelDataType rowType,
+ String name,
+ StorageEngineConfig storageEngineConfig,
+ Object selection,
+ String storageEngineName,
+ boolean useReferenceInterpreter
+ ) {
+
super(schema.getQueryProvider(), elementType, expression);
+ DrillConfig config = DrillConfig.create();
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ try {
+ bit = new Drillbit(config, serviceSet);
+ client = new DrillClient(config, serviceSet.getCoordinator());
+ bit.run();
+ } catch (IOException e) {
+ System.out.println("Error creating drill client or connecting to drillbit.");
+ } catch (Exception e) {
+ System.out.println("Error creating drill client or connecting to drillbit.");
+ }
this.schema = schema;
this.name = name;
this.rowType = rowType;
this.storageEngineConfig = storageEngineConfig;
this.selection = selection;
this.storageEngineName = storageEngineName;
+ this.useReferenceInterpreter = useReferenceInterpreter;
}
private static DrillTable createTable(
@@ -78,8 +106,9 @@ public class DrillTable extends BaseQueryable<Object>
String name,
StorageEngineConfig storageEngineConfig,
Object selection,
- String storageEngineName
- ) {
+ String storageEngineName,
+ boolean useReferenceInterpreter
+ ) {
final MethodCallExpression call = Expressions.call(schema.getExpression(),
BuiltinMethod.DATA_CONTEXT_GET_TABLE.method,
Expressions.constant(name),
@@ -91,8 +120,8 @@ public class DrillTable extends BaseQueryable<Object>
typeFactory.createSqlType(SqlTypeName.VARCHAR),
typeFactory.createSqlType(SqlTypeName.ANY))),
Collections.singletonList("_MAP"));
- return new DrillTable(schema, Object.class, call, rowType, name,
- storageEngineConfig, selection, storageEngineName);
+ return new DrillTable(schema, Object.class, call, rowType, name,
+ storageEngineConfig, selection, storageEngineName, useReferenceInterpreter);
}
@Override
@@ -129,19 +158,32 @@ public class DrillTable extends BaseQueryable<Object>
return t0 != null ? t0 : t1;
}
- /** Factory for custom tables in Optiq schema. */
+ public boolean useReferenceInterpreter() {
+ return useReferenceInterpreter;
+ }
+
+ /**
+ * Factory for custom tables in Optiq schema.
+ */
@SuppressWarnings("UnusedDeclaration")
public static class Factory implements TableFactory<DrillTable> {
@Override
public DrillTable create(Schema schema, String name,
- Map<String, Object> operand, RelDataType rowType) {
+ Map<String, Object> operand, RelDataType rowType) {
final ClasspathRSE.ClasspathRSEConfig rseConfig =
new ClasspathRSE.ClasspathRSEConfig();
final ClasspathInputConfig inputConfig = new ClasspathInputConfig();
inputConfig.path = last((String) operand.get("path"), "/donuts.json");
+ boolean useReferenceInterpreter;
+ if (operand.get("useReferenceInterpreter") != null){
+ useReferenceInterpreter = operand.get("useReferenceInterpreter").equals("true") ? true : false;
+ }
+ else{
+ useReferenceInterpreter = false;
+ }
inputConfig.type = DataWriter.ConverterType.JSON;
return createTable(schema.getTypeFactory(), (MutableSchema) schema, name,
- rseConfig, inputConfig, "donuts-json");
+ rseConfig, inputConfig, "donuts-json", useReferenceInterpreter);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java
deleted file mode 100644
index 0a225cf..0000000
--- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.optiq;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
-
-import net.hydromatic.linq4j.AbstractEnumerable;
-import net.hydromatic.linq4j.Enumerable;
-import net.hydromatic.linq4j.Enumerator;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.logical.LogicalPlan;
-import org.apache.drill.exec.ref.IteratorRegistry;
-import org.apache.drill.exec.ref.ReferenceInterpreter;
-import org.apache.drill.exec.ref.RunOutcome;
-import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory;
-import org.apache.drill.exec.ref.rse.RSERegistry;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-/**
- * Runtime helper that executes a Drill query and converts it into an
- * {@link Enumerable}.
- */
-public class EnumerableDrill<E>
- extends AbstractEnumerable<E>
- implements Enumerable<E> {
- private final LogicalPlan plan;
- final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(100);
- final DrillConfig config;
- private final String holder;
- private final List<String> fields;
-
- private static final ObjectMapper mapper = createMapper();
-
- /** Creates a DrillEnumerable.
- *
- * @param plan Logical plan
- * @param clazz Type of elements returned from enumerable
- * @param fields Names of fields, or null to return the whole blob
- */
- public EnumerableDrill(DrillConfig config, LogicalPlan plan, Class<E> clazz,
- List<String> fields) {
- this.plan = plan;
- this.config = config;
- this.holder = null;
- this.fields = fields;
- config.setSinkQueues(0, queue);
- }
-
- /** Creates a DrillEnumerable from a plan represented as a string. Each record
- * returned is a {@link JsonNode}. */
- public static <E> EnumerableDrill<E> of(String plan,
- final List<String> fieldNames, Class<E> clazz) {
- DrillConfig config = DrillConfig.create();
- final LogicalPlan parse = LogicalPlan.parse(config, plan);
- return new EnumerableDrill<>(config, parse, clazz, fieldNames);
- }
-
- /** Runs the plan as a background task. */
- Future<Collection<RunOutcome>> runPlan(
- CompletionService<Collection<RunOutcome>> service) {
- IteratorRegistry ir = new IteratorRegistry();
- DrillConfig config = DrillConfig.create();
- config.setSinkQueues(0, queue);
- final ReferenceInterpreter i =
- new ReferenceInterpreter(plan, ir, new BasicEvaluatorFactory(ir),
- new RSERegistry(config));
- try {
- i.setup();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return service.submit(
- new Callable<Collection<RunOutcome>>() {
- @Override
- public Collection<RunOutcome> call() throws Exception {
- Collection<RunOutcome> outcomes = i.run();
-
- for (RunOutcome outcome : outcomes) {
- System.out.println("============");
- System.out.println(outcome);
- if (outcome.outcome == RunOutcome.OutcomeType.FAILED
- && outcome.exception != null) {
- outcome.exception.printStackTrace();
- }
- }
- return outcomes;
- }
- });
- }
-
- @Override
- public Enumerator<E> enumerator() {
- // TODO: use a completion service from the container
- final ExecutorCompletionService<Collection<RunOutcome>> service =
- new ExecutorCompletionService<Collection<RunOutcome>>(
- new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS,
- new LinkedBlockingDeque<Runnable>(10)));
-
- // Run the plan using an executor. It runs in a different thread, writing
- // results to our queue.
- //
- // TODO: use the result of task, and check for exceptions
- final Future<Collection<RunOutcome>> task = runPlan(service);
-
- return new JsonEnumerator(queue, fields);
- }
-
- private static ObjectMapper createMapper() {
- return new ObjectMapper();
- }
-
- /** Converts a JSON document, represented as an array of bytes, into a Java
- * object (consisting of Map, List, String, Integer, Double, Boolean). */
- static Object parseJson(byte[] bytes) {
- try {
- return wrapper(mapper.readTree(bytes));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /** Converts a JSON node to Java objects ({@link List}, {@link Map},
- * {@link String}, {@link Integer}, {@link Double}, {@link Boolean}. */
- static Object wrapper(JsonNode node) {
- switch (node.asToken()) {
- case START_OBJECT:
- return map((ObjectNode) node);
- case START_ARRAY:
- return array((ArrayNode) node);
- case VALUE_STRING:
- return node.asText();
- case VALUE_NUMBER_INT:
- return node.asInt();
- case VALUE_NUMBER_FLOAT:
- return node.asDouble();
- case VALUE_TRUE:
- return Boolean.TRUE;
- case VALUE_FALSE:
- return Boolean.FALSE;
- case VALUE_NULL:
- return null;
- default:
- throw new AssertionError("unexpected: " + node + ": " + node.asToken());
- }
- }
-
- private static List<Object> array(ArrayNode node) {
- final List<Object> list = new ArrayList<>();
- for (JsonNode jsonNode : node) {
- list.add(wrapper(jsonNode));
- }
- return Collections.unmodifiableList(list);
- }
-
- private static SortedMap<String, Object> map(ObjectNode node) {
- // TreeMap makes the results deterministic.
- final TreeMap<String, Object> map = new TreeMap<>();
- final Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
- while (fields.hasNext()) {
- Map.Entry<String, JsonNode> next = fields.next();
- map.put(next.getKey(), wrapper(next.getValue()));
- }
- return Collections.unmodifiableSortedMap(map);
- }
-
- private static class JsonEnumerator implements Enumerator {
- private final BlockingQueue<Object> queue;
- private final String holder;
- private final List<String> fields;
- private Object current;
-
- public JsonEnumerator(BlockingQueue<Object> queue, List<String> fields) {
- this.queue = queue;
- this.holder = null;
- this.fields = fields;
- }
-
- public Object current() {
- return current;
- }
-
- public boolean moveNext() {
- try {
- Object o = queue.take();
- if (o instanceof RunOutcome.OutcomeType) {
- switch ((RunOutcome.OutcomeType) o) {
- case SUCCESS:
- return false; // end of data
- case CANCELED:
- throw new RuntimeException("canceled");
- case FAILED:
- default:
- throw new RuntimeException("failed");
- }
- } else {
- Object o1 = parseJson((byte[]) o);
- if (holder != null) {
- o1 = ((Map<String, Object>) o1).get(holder);
- }
- if (fields == null) {
- current = o1;
- } else {
- final Map<String, Object> map = (Map<String, Object>) o1;
- if (fields.size() == 1) {
- current = map.get(fields.get(0));
- } else {
- Object[] os = new Object[fields.size()];
- for (int i = 0; i < os.length; i++) {
- os[i] = map.get(fields.get(i));
- }
- current = os;
- }
- }
- return true;
- }
- } catch (InterruptedException e) {
- Thread.interrupted();
- throw new RuntimeException(e);
- }
- }
-
- public void reset() {
- throw new UnsupportedOperationException();
- }
- }
-}
-
-// End EnumerableDrill.java
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillFullEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillFullEngine.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillFullEngine.java
new file mode 100644
index 0000000..8c41b99
--- /dev/null
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillFullEngine.java
@@ -0,0 +1,85 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.optiq;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import net.hydromatic.linq4j.AbstractEnumerable;
+import net.hydromatic.linq4j.Enumerable;
+import net.hydromatic.linq4j.Enumerator;
+import net.hydromatic.optiq.DataContext;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.jdbc.DrillTable;
+import org.apache.drill.sql.client.full.DrillFullImpl;
+import org.apache.drill.sql.client.ref.DrillRefImpl;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Runtime helper that executes a Drill query and converts it into an {@link Enumerable}.
+ */
+public class EnumerableDrillFullEngine<E> extends AbstractEnumerable<E> implements Enumerable<E> {
+ private final String plan;
+ final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(100);
+ final DrillConfig config;
+ private final List<String> fields;
+ private DataContext drillConnectionDataContext;
+
+ /**
+ * Creates a DrillEnumerable.
+ *
+ * @param plan
+ * Logical plan
+ * @param clazz
+ * Type of elements returned from enumerable
+ * @param fields
+ * Names of fields, or null to return the whole blob
+ */
+ public EnumerableDrillFullEngine(DrillConfig config, String plan, Class<E> clazz, List<String> fields, DataContext drillConnectionDataContext) {
+ this.plan = plan;
+ this.config = config;
+ this.fields = fields;
+ this.drillConnectionDataContext = drillConnectionDataContext;
+ config.setSinkQueues(0, queue);
+ }
+
+ /**
+ * Creates a DrillEnumerable from a plan represented as a string. Each record returned is a {@link JsonNode}.
+ */
+ public static <E> EnumerableDrillFullEngine<E> of(String plan, final List<String> fieldNames, Class<E> clazz,
+ DataContext drillTable) {
+ DrillConfig config = DrillConfig.create();
+ return new EnumerableDrillFullEngine<>(config, plan, clazz, fieldNames, drillTable);
+ }
+
+ @Override
+ public Enumerator<E> enumerator() {
+ DrillTable table = (DrillTable) drillConnectionDataContext.getSubSchema("DONUTS").getTable("DONUTS", Object.class);
+ if(table.useReferenceInterpreter()){
+ DrillRefImpl<E> impl = new DrillRefImpl<E>(plan, config, fields, queue);
+ return impl.enumerator(table);
+ } else {
+ DrillFullImpl<E> impl = new DrillFullImpl<E>(plan, config, fields);
+ return impl.enumerator(table);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java
index ce21c92..567378d 100644
--- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRel.java
@@ -20,10 +20,12 @@ package org.apache.drill.optiq;
import net.hydromatic.linq4j.expressions.*;
import net.hydromatic.linq4j.function.Function1;
import net.hydromatic.linq4j.function.Functions;
+import net.hydromatic.optiq.DataContext;
import net.hydromatic.optiq.impl.java.JavaTypeFactory;
import net.hydromatic.optiq.rules.java.*;
import org.apache.drill.common.util.Hook;
+import org.apache.drill.optiq.EnumerableDrillFullEngine;
import org.eigenbase.rel.RelNode;
import org.eigenbase.rel.SingleRel;
import org.eigenbase.relopt.*;
@@ -44,7 +46,7 @@ public class EnumerableDrillRel extends SingleRel implements EnumerableRel {
private static final Logger LOG =
LoggerFactory.getLogger(EnumerableDrillRel.class);
- private static final Function1<String,Expression> TO_LITERAL =
+ private static final Function1<String, Expression> TO_LITERAL =
new Function1<String, Expression>() {
@Override
public Expression apply(String a0) {
@@ -59,16 +61,16 @@ public class EnumerableDrillRel extends SingleRel implements EnumerableRel {
static {
try {
OF_METHOD =
- EnumerableDrill.class.getMethod("of", String.class, List.class, Class.class);
+ EnumerableDrillFullEngine.class.getMethod("of", String.class, List.class, Class.class, DataContext.class);
+ //EnumerableDrillFullEngine.class.getMethod("of", String.class, List.class, Class.class);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
public EnumerableDrillRel(RelOptCluster cluster,
- RelTraitSet traitSet,
- RelNode input)
- {
+ RelTraitSet traitSet,
+ RelNode input) {
super(cluster, traitSet, input);
assert getConvention() instanceof EnumerableConvention;
assert input.getConvention() == DrillRel.CONVENTION;
@@ -100,7 +102,11 @@ public class EnumerableDrillRel extends SingleRel implements EnumerableRel {
drillImplementor.go(input);
String plan = drillImplementor.getJsonString();
Hook.LOGICAL_PLAN.run(plan);
+
+ // not quite sure where this list was supposed to be set earlier, leaving it null got me back the full result set
+
final List<String> fieldNameList = RelOptUtil.getFieldNameList(rowType);
+ //final List<String> fieldNameList = null;
return new BlockBuilder()
.append(
Expressions.call(
@@ -112,7 +118,9 @@ public class EnumerableDrillRel extends SingleRel implements EnumerableRel {
Expressions.newArrayInit(
String.class,
Functions.apply(fieldNameList, TO_LITERAL))),
- Expressions.constant(Object.class)))
+ Expressions.constant(Object.class),
+ Expressions.variable(DataContext.class, "root")
+ ))
.toBlock();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRule.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRule.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRule.java
index fa1bd4f..6057163 100644
--- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRule.java
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrillRule.java
@@ -22,20 +22,14 @@ import org.eigenbase.rel.RelNode;
import org.eigenbase.rel.convert.ConverterRule;
/**
- * Rule that converts any Drill relational expression to enumerable format by
- * adding a {@link EnumerableDrillRel}.
+ * Rule that converts any Drill relational expression to enumerable format by adding a {@link EnumerableDrillRel}.
*/
public class EnumerableDrillRule extends ConverterRule {
- public static final EnumerableDrillRule ARRAY_INSTANCE =
- new EnumerableDrillRule(EnumerableConvention.ARRAY);
- public static final EnumerableDrillRule CUSTOM_INSTANCE =
- new EnumerableDrillRule(EnumerableConvention.CUSTOM);
+ public static final EnumerableDrillRule ARRAY_INSTANCE = new EnumerableDrillRule(EnumerableConvention.ARRAY);
+ public static final EnumerableDrillRule CUSTOM_INSTANCE = new EnumerableDrillRule(EnumerableConvention.CUSTOM);
private EnumerableDrillRule(EnumerableConvention outConvention) {
- super(RelNode.class,
- DrillRel.CONVENTION,
- outConvention,
- "EnumerableDrillRule." + outConvention);
+ super(RelNode.class, DrillRel.CONVENTION, outConvention, "EnumerableDrillRule." + outConvention);
}
@Override
@@ -46,9 +40,7 @@ public class EnumerableDrillRule extends ConverterRule {
@Override
public RelNode convert(RelNode rel) {
assert rel.getTraitSet().contains(DrillRel.CONVENTION);
- return new EnumerableDrillRel(rel.getCluster(),
- rel.getTraitSet().replace(getOutConvention()),
- rel);
+ return new EnumerableDrillRel(rel.getCluster(), rel.getTraitSet().replace(getOutConvention()), rel);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
new file mode 100644
index 0000000..b3a7e35
--- /dev/null
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
@@ -0,0 +1,48 @@
+package org.apache.drill.sql.client.full;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+
+public class BatchListener implements UserResultsListener {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchListener.class);
+
+ private RpcException ex;
+ private volatile boolean completed = false;
+
+ final BlockingQueue<QueryResultBatch> queue = new ArrayBlockingQueue<>(100);
+
+ @Override
+ public void submissionFailed(RpcException ex) {
+ this.ex = ex;
+ completed = true;
+ }
+
+ @Override
+ public void resultArrived(QueryResultBatch result) {
+ logger.debug("Result arrived {}", result);
+ queue.add(result);
+ if(result.getHeader().getIsLastChunk()){
+ completed = true;
+ }
+ }
+
+ public boolean completed(){
+ return completed;
+ }
+
+ public QueryResultBatch getNext() throws RpcException, InterruptedException{
+ if(ex != null) throw ex;
+ if(completed && queue.isEmpty()){
+ return null;
+ }else{
+ return queue.take();
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
new file mode 100644
index 0000000..4eb243d
--- /dev/null
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
@@ -0,0 +1,185 @@
+package org.apache.drill.sql.client.full;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import jline.internal.Preconditions;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.google.common.collect.Maps;
+
+@JsonSerialize(using = BatchLoaderMap.Se.class)
+public class BatchLoaderMap implements Map<String, Object> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchLoaderMap.class);
+
+ private BatchListener listener;
+ private RecordBatchLoader loader;
+ private final List<String> requestedFields;
+ private final Map<String, ValueVector> fields = Maps.newHashMap();
+ private int index;
+ private Object[] objArr;
+
+ public BatchLoaderMap(List<String> requestedFields, BatchListener listener, DrillbitContext context) {
+ this.listener = listener;
+ this.requestedFields = requestedFields;
+ this.objArr = new Object[requestedFields.size()];
+ this.loader = new RecordBatchLoader(context.getAllocator());
+ }
+
+ private void load(QueryResultBatch batch) throws SchemaChangeException {
+ boolean schemaChanged = loader.load(batch.getHeader().getDef(), batch.getData());
+ if (schemaChanged) {
+ fields.clear();
+ for (ValueVector v : loader) {
+ fields.put(v.getField().getName(), v);
+ }
+ } else {
+ logger.debug("Schema didn't change. {}", batch);
+ }
+ }
+
+ public boolean next() throws SchemaChangeException, RpcException, InterruptedException {
+ index++;
+ if (index < loader.getRecordCount()) {
+ return true;
+ } else {
+ logger.debug("Starting next query result batch.");
+ QueryResultBatch qrb;
+ while( (qrb = listener.getNext()) != null && !qrb.hasData()){
+ qrb = listener.getNext();
+ }
+
+ if (qrb == null) {
+ logger.debug("No more batches found.");
+ index = -1;
+ return false;
+ } else {
+ load(qrb);
+ logger.debug("New batch found and loaded. {}", qrb.getHeader().getDef());
+ index = 0;
+ return true;
+ }
+
+ }
+ }
+
+ public Object getCurrentAsObjectArray() {
+
+ for (int i = 0; i < requestedFields.size(); i++) {
+ ValueVector vv = fields.get(requestedFields.get(i));
+ if (vv == null) {
+ objArr[i] = null;
+ } else {
+ objArr[i] = vv.getAccessor().getObject(index);
+ }
+ }
+ return objArr;
+
+ }
+
+ @Override
+ public int size() {
+ return fields.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return fields.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ if (!(key instanceof String))
+ return false;
+ return fields.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object get(Object key) {
+ ValueVector v = fields.get(key);
+ Preconditions.checkNotNull(v);
+ return v.getAccessor().getObject(index);
+ }
+
+ @Override
+ public Object put(String key, Object value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object remove(Object key) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void putAll(Map<? extends String, ? extends Object> m) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ }
+
+ @Override
+ public Set<String> keySet() {
+ return fields.keySet();
+ }
+
+ @Override
+ public Collection<Object> values() {
+ throw new UnsupportedOperationException();
+ }
+
+ public String toString() {
+ try {
+ return new ObjectMapper().writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Set<java.util.Map.Entry<String, Object>> entrySet() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static class Se extends StdSerializer<BatchLoaderMap> {
+
+ public Se() {
+ super(BatchLoaderMap.class);
+ }
+
+ @Override
+ public void serialize(BatchLoaderMap value, JsonGenerator jgen, SerializerProvider provider) throws IOException,
+ JsonGenerationException {
+ jgen.writeStartObject();
+ assert value.index > -1 && value.index < value.loader.getRecordCount();
+ for (Map.Entry<String, ValueVector> me : value.fields.entrySet()) {
+ jgen.writeObjectField(me.getKey(), me.getValue().getAccessor().getObject(value.index));
+ }
+ jgen.writeEndObject();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/DrillFullImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/DrillFullImpl.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/DrillFullImpl.java
new file mode 100644
index 0000000..0974365
--- /dev/null
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/DrillFullImpl.java
@@ -0,0 +1,64 @@
+package org.apache.drill.sql.client.full;
+
+import java.util.List;
+
+import net.hydromatic.linq4j.Enumerator;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.jdbc.DrillTable;
+
+public class DrillFullImpl<E>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFullImpl.class);
+
+ private final String plan;
+ final DrillConfig config;
+ private final List<String> fields;
+
+
+ public DrillFullImpl(String plan, DrillConfig config, List<String> fields) {
+ super();
+ this.plan = plan;
+ this.config = config;
+ this.fields = fields;
+ }
+
+ public Enumerator<E> enumerator(DrillTable table) {
+
+ BatchListener listener = new BatchListener();
+
+ // TODO: use a completion service from the container
+ QueryRequestRunner runner = new QueryRequestRunner(plan, table.client, listener);
+ runner.start();
+
+ return (Enumerator<E>) new ResultEnumerator(listener, table.bit.getContext(),fields);
+
+ }
+
+ public class QueryRequestRunner extends Thread{
+ final String plan;
+ final DrillClient client;
+ final BatchListener listener;
+
+ public QueryRequestRunner(String plan, DrillClient client, BatchListener listener) {
+ super();
+ this.setDaemon(true);
+ this.plan = plan;
+ this.client = client;
+ this.listener = listener;
+ }
+
+ @Override
+ public void run() {
+ try {
+ client.connect();
+ client.runQuery(UserProtos.QueryType.LOGICAL, plan, listener);
+ } catch (RpcException e) {
+ listener.submissionFailed(e);
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/ResultEnumerator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/ResultEnumerator.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/ResultEnumerator.java
new file mode 100644
index 0000000..8839e92
--- /dev/null
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/ResultEnumerator.java
@@ -0,0 +1,48 @@
+package org.apache.drill.sql.client.full;
+
+import java.util.List;
+import java.util.Map;
+
+import net.hydromatic.linq4j.Enumerator;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.DrillbitContext;
+
+/**
+ * Enumerator used for full execution engine.
+ */
+ class ResultEnumerator implements Enumerator<Object> {
+
+ private final BatchLoaderMap loaderMap;
+ private Object current;
+
+ public ResultEnumerator(BatchListener listener, DrillbitContext context, List<String> fields) {
+ this.loaderMap = new BatchLoaderMap(fields, listener, context);
+ }
+
+ public Object current() {
+ return current;
+ }
+
+ public boolean moveNext() {
+
+ try {
+ boolean succ = loaderMap.next();
+ if(succ){
+ current = loaderMap.getCurrentAsObjectArray();
+ }
+ return succ;
+
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new RuntimeException(e);
+ } catch (RpcException | SchemaChangeException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void reset() {
+ throw new UnsupportedOperationException();
+ }
+ }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java
new file mode 100644
index 0000000..651dd69
--- /dev/null
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java
@@ -0,0 +1,240 @@
+package org.apache.drill.sql.client.ref;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import net.hydromatic.linq4j.Enumerator;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.exec.ref.IteratorRegistry;
+import org.apache.drill.exec.ref.ReferenceInterpreter;
+import org.apache.drill.exec.ref.RunOutcome;
+import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory;
+import org.apache.drill.exec.ref.rse.RSERegistry;
+import org.apache.drill.jdbc.DrillTable;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class DrillRefImpl<E> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRefImpl.class);
+
+ private static final ObjectMapper mapper = createMapper();
+
+ private final String plan;
+ final BlockingQueue<Object> queue;
+ final DrillConfig config;
+ private final List<String> fields;
+
+
+ public DrillRefImpl(String plan, DrillConfig config, List<String> fields, BlockingQueue<Object> queue) {
+ super();
+ this.plan = plan;
+ this.config = config;
+ this.fields = fields;
+ this.queue = queue;
+ }
+
+
+ private static ObjectMapper createMapper() {
+ return new ObjectMapper();
+ }
+
+ /**
+ * Enumerator used for reference interpreter
+ */
+ private static class JsonEnumerator implements Enumerator {
+ private final BlockingQueue<Object> queue;
+ private final String holder;
+ private final List<String> fields;
+ private Object current;
+
+ public JsonEnumerator(BlockingQueue<Object> queue, List<String> fields) {
+ this.queue = queue;
+ this.holder = null;
+ this.fields = fields;
+ }
+
+ public Object current() {
+ return current;
+ }
+
+ public boolean moveNext() {
+ try {
+ Object o = queue.take();
+ if (o instanceof RunOutcome.OutcomeType) {
+ switch ((RunOutcome.OutcomeType) o) {
+ case SUCCESS:
+ return false; // end of data
+ case CANCELED:
+ throw new RuntimeException("canceled");
+ case FAILED:
+ default:
+ throw new RuntimeException("failed");
+ }
+ } else {
+ Object o1 = parseJson((byte[]) o);
+ if (holder != null) {
+ o1 = ((Map<String, Object>) o1).get(holder);
+ }
+ if (fields == null) {
+ current = o1;
+ } else {
+ final Map<String, Object> map = (Map<String, Object>) o1;
+ if (fields.size() == 1) {
+ current = map.get(fields.get(0));
+ } else {
+ Object[] os = new Object[fields.size()];
+ for (int i = 0; i < os.length; i++) {
+ os[i] = map.get(fields.get(i));
+ }
+ current = os;
+ }
+ }
+ return true;
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void reset() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
+ /**
+ * Runs the plan as a background task.
+ */
+ Future<Collection<RunOutcome>> runRefInterpreterPlan(
+ CompletionService<Collection<RunOutcome>> service) {
+ LogicalPlan parsedPlan = LogicalPlan.parse(DrillConfig.create(), plan);
+ IteratorRegistry ir = new IteratorRegistry();
+ DrillConfig config = DrillConfig.create();
+ config.setSinkQueues(0, queue);
+ final ReferenceInterpreter i =
+ new ReferenceInterpreter(parsedPlan, ir, new BasicEvaluatorFactory(ir),
+ new RSERegistry(config));
+ try {
+ i.setup();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return service.submit(
+ new Callable<Collection<RunOutcome>>() {
+ @Override
+ public Collection<RunOutcome> call() throws Exception {
+ Collection<RunOutcome> outcomes = i.run();
+
+ for (RunOutcome outcome : outcomes) {
+ System.out.println("============");
+ System.out.println(outcome);
+ if (outcome.outcome == RunOutcome.OutcomeType.FAILED
+ && outcome.exception != null) {
+ outcome.exception.printStackTrace();
+ }
+ }
+ return outcomes;
+ }
+ });
+ }
+
+
+
+ public Enumerator<E> enumerator(DrillTable table) {
+ // TODO: use a completion service from the container
+ final ExecutorCompletionService<Collection<RunOutcome>> service = new ExecutorCompletionService<Collection<RunOutcome>>(
+ new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10)));
+
+ // Run the plan using an executor. It runs in a different thread, writing
+ // results to our queue.
+ //
+ // TODO: use the result of task, and check for exceptions
+ final Future<Collection<RunOutcome>> task = runRefInterpreterPlan(service);
+
+ return new JsonEnumerator(queue, fields);
+
+ }
+
+ /**
+ * Converts a JSON document, represented as an array of bytes, into a Java
+ * object (consisting of Map, List, String, Integer, Double, Boolean).
+ */
+ static Object parseJson(byte[] bytes) {
+ try {
+ return wrapper(mapper.readTree(bytes));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+
+ /**
+ * Converts a JSON node to Java objects ({@link List}, {@link Map},
+ * {@link String}, {@link Integer}, {@link Double}, {@link Boolean}.
+ */
+ static Object wrapper(JsonNode node) {
+ switch (node.asToken()) {
+ case START_OBJECT:
+ return map((ObjectNode) node);
+ case START_ARRAY:
+ return array((ArrayNode) node);
+ case VALUE_STRING:
+ return node.asText();
+ case VALUE_NUMBER_INT:
+ return node.asInt();
+ case VALUE_NUMBER_FLOAT:
+ return node.asDouble();
+ case VALUE_TRUE:
+ return Boolean.TRUE;
+ case VALUE_FALSE:
+ return Boolean.FALSE;
+ case VALUE_NULL:
+ return null;
+ default:
+ throw new AssertionError("unexpected: " + node + ": " + node.asToken());
+ }
+ }
+
+ private static List<Object> array(ArrayNode node) {
+ final List<Object> list = new ArrayList<>();
+ for (JsonNode jsonNode : node) {
+ list.add(wrapper(jsonNode));
+ }
+ return Collections.unmodifiableList(list);
+ }
+
+ private static SortedMap<String, Object> map(ObjectNode node) {
+ // TreeMap makes the results deterministic.
+ final TreeMap<String, Object> map = new TreeMap<>();
+ final Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
+ while (fields.hasNext()) {
+ Map.Entry<String, JsonNode> next = fields.next();
+ map.put(next.getKey(), wrapper(next.getValue()));
+ }
+ return Collections.unmodifiableSortedMap(map);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java
index ecee65b..a30ce65 100644
--- a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java
+++ b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java
@@ -24,10 +24,13 @@ import org.apache.drill.common.util.Hook;
import java.sql.*;
import java.util.Properties;
+import static org.junit.Assert.assertEquals;
+
/**
* Fluent interface for writing JDBC and query-planning tests.
*/
public class JdbcAssert {
+
public static One withModel(String model, String schema) {
final Properties info = new Properties();
info.setProperty("schema", schema);
@@ -35,10 +38,12 @@ public class JdbcAssert {
return new One(info);
}
- static String toString(ResultSet resultSet) throws SQLException {
+ static String toString(ResultSet resultSet, int expectedRecordCount) throws SQLException {
StringBuilder buf = new StringBuilder();
+ int total = 0, n;
while (resultSet.next()) {
- int n = resultSet.getMetaData().getColumnCount();
+ n = resultSet.getMetaData().getColumnCount();
+ total++;
String sep = "";
for (int i = 1; i <= n; i++) {
buf.append(sep)
@@ -49,9 +54,26 @@ public class JdbcAssert {
}
buf.append("\n");
}
+ if (false && expectedRecordCount > 0){
+ assertEquals("Expected record count not matched.", total, expectedRecordCount);
+ }
return buf.toString();
}
+ static String toString(ResultSet resultSet) throws SQLException {
+ return toString(resultSet, -1);
+ }
+
+ static int countRecords(ResultSet resultSet) throws SQLException {
+ StringBuilder buf = new StringBuilder();
+ int total = 0, n;
+ while (resultSet.next()) {
+ n = resultSet.getMetaData().getColumnCount();
+ total += n;
+ }
+ return total;
+ }
+
public static class One {
private final Properties info;
private final ConnectionFactory connectionFactory;
@@ -82,6 +104,8 @@ public class JdbcAssert {
}
}
}
+
+
}
public static class Two {
@@ -113,6 +137,27 @@ public class JdbcAssert {
}
}
+ public Two displayResults(int recordCount) throws Exception {
+ Connection connection = null;
+ Statement statement = null;
+ try {
+ connection = connectionFactory.createConnection();
+ statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ // record count check is done in toString method
+ System.out.println(JdbcAssert.toString(resultSet, recordCount));
+ resultSet.close();
+ return this;
+ } finally {
+ if (statement != null) {
+ statement.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
public void planContains(String expected) {
final String[] plan0 = {null};
Connection connection = null;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/103072a6/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
index 2d57732..26c7a06 100644
--- a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
+++ b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
@@ -21,45 +21,85 @@ import com.google.common.base.Function;
import junit.framework.TestCase;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.jdbc.DrillTable;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
import java.sql.*;
-/** Unit tests for Drill's JDBC driver. */
+/**
+ * Unit tests for Drill's JDBC driver.
+ */
public class JdbcTest extends TestCase {
+
+ // Determine if we are in Eclipse Debug mode.
+ static final boolean IS_DEBUG = java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0;
+
+ // Set a timeout unless we're debugging.
+ @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(10000);
+
+
private static final String MODEL =
"{\n"
- + " version: '1.0',\n"
- + " schemas: [\n"
- + " {\n"
- + " name: 'DONUTS',\n"
- + " tables: [\n"
- + " {\n"
- + " name: 'DONUTS',\n"
- + " type: 'custom',\n"
- + " factory: '" + DrillTable.Factory.class.getName() + "'\n,"
- + " operand: {\n"
- + " path: '/donuts.json'\n"
- + " }\n"
- + " }\n"
- + " ]\n"
- + " }\n"
- + " ]\n"
- + "}";
+ + " version: '1.0',\n"
+ + " schemas: [\n"
+ + " {\n"
+ + " name: 'DONUTS',\n"
+ + " tables: [\n"
+ + " {\n"
+ + " name: 'DONUTS',\n"
+ + " type: 'custom',\n"
+ + " factory: '" + DrillTable.Factory.class.getName() + "'\n,"
+ + " operand: {\n"
+ + " path: '/donuts.json',\n"
+ + " useReferenceInterpreter: 'true'\n"
+ + " }\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
+
+ private static final String MODEL_FULL_ENGINE =
+ "{\n"
+ + " version: '1.0',\n"
+ + " schemas: [\n"
+ + " {\n"
+ + " name: 'DONUTS',\n"
+ + " tables: [\n"
+ + " {\n"
+ + " name: 'DONUTS',\n"
+ + " type: 'custom',\n"
+ + " factory: '" + DrillTable.Factory.class.getName() + "'\n,"
+ + " operand: {\n"
+ + " path: '/donuts.json'\n"
+ + " }\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
private static final String EXPECTED =
"_MAP={batters={batter=[{id=1001, type=Regular}, {id=1002, type=Chocolate}, {id=1003, type=Blueberry}, {id=1004, type=Devil's Food}]}, id=0001, name=Cake, ppu=0.55, sales=35, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5005, type=Sugar}, {id=5007, type=Powdered Sugar}, {id=5006, type=Chocolate with Sprinkles}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
- + "_MAP={batters={batter=[{id=1001, type=Regular}]}, id=0002, name=Raised, ppu=0.69, sales=145, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5005, type=Sugar}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
- + "_MAP={batters={batter=[{id=1001, type=Regular}, {id=1002, type=Chocolate}]}, id=0003, name=Old Fashioned, ppu=0.55, sales=300, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
- + "_MAP={batters={batter=[{id=1001, type=Regular}, {id=1002, type=Chocolate}, {id=1003, type=Blueberry}, {id=1004, type=Devil's Food}]}, filling=[{id=6001, type=None}, {id=6002, type=Raspberry}, {id=6003, type=Lemon}, {id=6004, type=Chocolate}, {id=6005, type=Kreme}], id=0004, name=Filled, ppu=0.69, sales=14, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5005, type=Sugar}, {id=5007, type=Powdered Sugar}, {id=5006, type=Chocolate with Sprinkles}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
- + "_MAP={batters={batter=[{id=1001, type=Regular}]}, id=0005, name=Apple Fritter, ppu=1.0, sales=700, topping=[{id=5002, type=Glazed}], type=donut}\n";
+ + "_MAP={batters={batter=[{id=1001, type=Regular}]}, id=0002, name=Raised, ppu=0.69, sales=145, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5005, type=Sugar}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
+ + "_MAP={batters={batter=[{id=1001, type=Regular}, {id=1002, type=Chocolate}]}, id=0003, name=Old Fashioned, ppu=0.55, sales=300, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
+ + "_MAP={batters={batter=[{id=1001, type=Regular}, {id=1002, type=Chocolate}, {id=1003, type=Blueberry}, {id=1004, type=Devil's Food}]}, filling=[{id=6001, type=None}, {id=6002, type=Raspberry}, {id=6003, type=Lemon}, {id=6004, type=Chocolate}, {id=6005, type=Kreme}], id=0004, name=Filled, ppu=0.69, sales=14, topping=[{id=5001, type=None}, {id=5002, type=Glazed}, {id=5005, type=Sugar}, {id=5007, type=Powdered Sugar}, {id=5006, type=Chocolate with Sprinkles}, {id=5003, type=Chocolate}, {id=5004, type=Maple}], type=donut}\n"
+ + "_MAP={batters={batter=[{id=1001, type=Regular}]}, id=0005, name=Apple Fritter, ppu=1.0, sales=700, topping=[{id=5002, type=Glazed}], type=donut}\n";
- /** Load driver. */
+ /**
+ * Load driver.
+ */
public void testLoadDriver() throws ClassNotFoundException {
Class.forName("org.apache.drill.jdbc.Driver");
}
- /** Load driver and make a connection. */
+ /**
+ * Load driver and make a connection.
+ */
public void testConnect() throws Exception {
Class.forName("org.apache.drill.jdbc.Driver");
final Connection connection = DriverManager.getConnection(
@@ -67,7 +107,9 @@ public class JdbcTest extends TestCase {
connection.close();
}
- /** Load driver, make a connection, prepare a statement. */
+ /**
+ * Load driver, make a connection, prepare a statement.
+ */
public void testPrepare() throws Exception {
JdbcAssert.withModel(MODEL, "DONUTS")
.withConnection(
@@ -85,14 +127,32 @@ public class JdbcTest extends TestCase {
});
}
- /** Simple query against JSON. */
+ /**
+ * Simple query against JSON.
+ */
public void testSelectJson() throws Exception {
JdbcAssert.withModel(MODEL, "DONUTS")
.sql("select * from donuts")
.returns(EXPECTED);
}
- /** Query with project list. No field references yet. */
+ public void testFullSelectStarEngine() throws Exception {
+ JdbcAssert.withModel(MODEL_FULL_ENGINE, "DONUTS")
+ //.sql("select cast(_MAP['red'] as bigint) + 1 as red_inc from donuts ")
+ .sql("select * from donuts ")
+ .displayResults(50);
+ }
+
+ public void testFullEngine() throws Exception {
+ JdbcAssert.withModel(MODEL_FULL_ENGINE, "DONUTS")
+ //.sql("select cast(_MAP['red'] as bigint) + 1 as red_inc from donuts ")
+ .sql("select cast(_MAP['RED'] as bigint) as RED, cast(_MAP['GREEN'] as bigint) as GREEN from donuts where cast(_MAP['red'] as BIGINT) > 1 ")
+ .displayResults(50);
+ }
+
+ /**
+ * Query with project list. No field references yet.
+ */
public void testProjectConstant() throws Exception {
JdbcAssert.withModel(MODEL, "DONUTS")
.sql("select 1 + 3 as c from donuts")
@@ -103,7 +163,9 @@ public class JdbcTest extends TestCase {
+ "C=4\n");
}
- /** Query that projects an element from the map. */
+ /**
+ * Query that projects an element from the map.
+ */
public void testProject() throws Exception {
JdbcAssert.withModel(MODEL, "DONUTS")
.sql("select _MAP['ppu'] as ppu from donuts")
@@ -114,11 +176,13 @@ public class JdbcTest extends TestCase {
+ "PPU=1.0\n");
}
- /** Same logic as {@link #testProject()}, but using a subquery. */
+ /**
+ * Same logic as {@link #testProject()}, but using a subquery.
+ */
public void testProjectOnSubquery() throws Exception {
JdbcAssert.withModel(MODEL, "DONUTS")
.sql("select d['ppu'] as ppu from (\n"
- + " select _MAP as d from donuts)")
+ + " select _MAP as d from donuts)")
.returns("PPU=0.55\n"
+ "PPU=0.69\n"
+ "PPU=0.55\n"
@@ -126,22 +190,26 @@ public class JdbcTest extends TestCase {
+ "PPU=1.0\n");
}
- /** Checks the logical plan. */
+ /**
+ * Checks the logical plan.
+ */
public void testProjectPlan() throws Exception {
JdbcAssert.withModel(MODEL, "DONUTS")
.sql("select _MAP['ppu'] as ppu from donuts")
.planContains(
"{'head':{'type':'APACHE_DRILL_LOGICAL','version':'1','generator':{'type':'manual','info':'na'}},"
- + "'storage':{'donuts-json':{'type':'classpath'},'queue':{'type':'queue'}},"
- + "'query':["
- + "{'op':'sequence','do':["
- + "{'op':'scan','memo':'initial_scan','ref':'_MAP','storageengine':'donuts-json','selection':{'path':'/donuts.json','type':'JSON'}},"
- + "{'op':'project','projections':[{'expr':'_MAP.ppu','ref':'output.PPU'}]},"
- + "{'op':'store','storageengine':'queue','memo':'output sink','target':{'number':0}}]}]}");
+ + "'storage':{'donuts-json':{'type':'classpath'},'queue':{'type':'queue'}},"
+ + "'query':["
+ + "{'op':'sequence','do':["
+ + "{'op':'scan','memo':'initial_scan','ref':'_MAP','storageengine':'donuts-json','selection':{'path':'/donuts.json','type':'JSON'}},"
+ + "{'op':'project','projections':[{'expr':'_MAP.ppu','ref':'output.PPU'}]},"
+ + "{'op':'store','storageengine':'queue','memo':'output sink','target':{'number':0}}]}]}");
}
- /** Query with subquery, filter, and projection of one real and one
- * nonexistent field from a map field. */
+ /**
+ * Query with subquery, filter, and projection of one real and one
+ * nonexistent field from a map field.
+ */
public void testProjectFilterSubquery() throws Exception {
JdbcAssert.withModel(MODEL, "DONUTS")
.sql("select d['name'] as name, d['xx'] as xx from (\n"
@@ -159,16 +227,18 @@ public class JdbcTest extends TestCase {
+ "where cast(d['ppu'] as double) > 0.6")
.planContains(
"{'head':{'type':'APACHE_DRILL_LOGICAL','version':'1','generator':{'type':'manual','info':'na'}},'storage':{'donuts-json':{'type':'classpath'},'queue':{'type':'queue'}},"
- + "'query':["
- + "{'op':'sequence','do':["
- + "{'op':'scan','memo':'initial_scan','ref':'_MAP','storageengine':'donuts-json','selection':{'path':'/donuts.json','type':'JSON'}},"
- + "{'op':'filter','expr':'(_MAP.donuts.ppu > 0.6)'},"
- + "{'op':'project','projections':[{'expr':'_MAP.donuts','ref':'output.D'}]},"
- + "{'op':'project','projections':[{'expr':'D.name','ref':'output.NAME'},{'expr':'D.xx','ref':'output.XX'}]},"
- + "{'op':'store','storageengine':'queue','memo':'output sink','target':{'number':0}}]}]}");
+ + "'query':["
+ + "{'op':'sequence','do':["
+ + "{'op':'scan','memo':'initial_scan','ref':'_MAP','storageengine':'donuts-json','selection':{'path':'/donuts.json','type':'JSON'}},"
+ + "{'op':'filter','expr':'(_MAP.donuts.ppu > 0.6)'},"
+ + "{'op':'project','projections':[{'expr':'_MAP.donuts','ref':'output.D'}]},"
+ + "{'op':'project','projections':[{'expr':'D.name','ref':'output.NAME'},{'expr':'D.xx','ref':'output.XX'}]},"
+ + "{'op':'store','storageengine':'queue','memo':'output sink','target':{'number':0}}]}]}");
}
- /** Query that projects one field. (Disabled; uses sugared syntax.) */
+ /**
+ * Query that projects one field. (Disabled; uses sugared syntax.)
+ */
public void _testProjectNestedFieldSugared() throws Exception {
JdbcAssert.withModel(MODEL, "DONUTS")
.sql("select donuts.ppu from donuts")
@@ -179,7 +249,9 @@ public class JdbcTest extends TestCase {
+ "C=4\n");
}
- /** Query with filter. No field references yet. */
+ /**
+ * Query with filter. No field references yet.
+ */
public void testFilterConstantFalse() throws Exception {
JdbcAssert.withModel(MODEL, "DONUTS")
.sql("select * from donuts where 3 > 4")