You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/06 23:46:30 UTC
[1/3] Sort operator using HyperBatch concept. Supports single and
multikey asc/desc sort. One comparator function definition. Add abstract
record batch,
vector container and vector wrapper. Various fixes to return to client when
query fails.
Updated Branches:
refs/heads/master 31f196497 -> db3afaa85
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
index ec17c63..38a4236 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
@@ -3,6 +3,7 @@ package org.apache.drill.exec.expr;
import static org.junit.Assert.assertEquals;
import mockit.Expectations;
import mockit.Injectable;
+import mockit.NonStrict;
import mockit.NonStrictExpectations;
import org.antlr.runtime.ANTLRStringStream;
@@ -23,8 +24,10 @@ import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.physical.impl.project.Projector;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ValueVector;
import org.junit.AfterClass;
import org.junit.Test;
@@ -37,15 +40,18 @@ public class ExpressionTest {
}
@Test
- public void testSpecial(final @Injectable RecordBatch batch) throws Exception {
- final TypedFieldId tfid = new TypedFieldId(Types.optional(MinorType.INT),0);
-
+ public void testSpecial(final @Injectable RecordBatch batch, @Injectable ValueVector vector) throws Exception {
+ final TypedFieldId tfid = new TypedFieldId(Types.optional(MinorType.INT),0, false);
+
new NonStrictExpectations() {
+ @NonStrict VectorWrapper<?> wrapper;
{
batch.getValueVectorId(new SchemaPath("alpha", ExpressionPosition.UNKNOWN));
result = tfid;
- batch.getValueVectorById(tfid.getFieldId(), IntVector.class);
- result = new IntVector(null, null);
+ batch.getValueAccessorById(tfid.getFieldId(), IntVector.class);
+ result = wrapper;
+ wrapper.getValueVector();
+ result = new IntVector(null, null);
}
};
@@ -54,7 +60,7 @@ public class ExpressionTest {
@Test
public void testSchemaExpression(final @Injectable RecordBatch batch) throws Exception {
- final TypedFieldId tfid = new TypedFieldId(Types.optional(MinorType.BIGINT), 0);
+ final TypedFieldId tfid = new TypedFieldId(Types.optional(MinorType.BIGINT), 0, false);
new Expectations() {
{
@@ -89,7 +95,7 @@ public class ExpressionTest {
}
CodeGenerator<Projector> cg = new CodeGenerator<Projector>(Projector.TEMPLATE_DEFINITION, new FunctionImplementationRegistry(DrillConfig.create()));
- cg.addExpr(new ValueVectorWriteExpression(-1, materializedExpr));
+ cg.addExpr(new ValueVectorWriteExpression(new TypedFieldId(materializedExpr.getMajorType(), -1), materializedExpr));
return cg.generate();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 5e7c599..dddd322 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -1,15 +1,19 @@
package org.apache.drill.exec.physical.impl;
import java.util.Iterator;
+import java.util.List;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.vector.ValueVector;
+import com.beust.jcommander.internal.Lists;
+
public class SimpleRootExec implements RootExec, Iterable<ValueVector>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRootExec.class);
@@ -28,9 +32,10 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector>{
return incoming.getSelectionVector2();
}
+ @SuppressWarnings("unchecked")
public <T extends ValueVector> T getValueVectorById(SchemaPath path, Class<?> vvClass){
TypedFieldId tfid = incoming.getValueVectorId(path);
- return incoming.getValueVectorById(tfid.getFieldId(), vvClass);
+ return (T) incoming.getValueAccessorById(tfid.getFieldId(), vvClass).getValueVector();
}
@Override
@@ -40,11 +45,16 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector>{
@Override
public void stop() {
+ incoming.kill();
}
@Override
public Iterator<ValueVector> iterator() {
- return incoming.iterator();
+ List<ValueVector> vv = Lists.newArrayList();
+ for(VectorWrapper<?> vw : incoming){
+ vv.add(vw.getValueVector());
+ }
+ return vv.iterator();
}
public int getRecordCount(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
index 7b002ea..e21289c 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
@@ -17,7 +17,7 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
import java.util.List;
@@ -26,6 +26,7 @@ import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.pop.PopUnitTestBase;
import org.apache.drill.exec.proto.UserProtos.QueryType;
import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
@@ -53,14 +54,14 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
int recordCount = 0;
for (QueryResultBatch batch : results) {
- if(!batch.hasData()) continue;
+
boolean schemaChanged = batchLoader.load(batch.getHeader().getDef(), batch.getData());
boolean firstColumn = true;
// print headers.
if (schemaChanged) {
System.out.println("\n\n========NEW SCHEMA=========\n\n");
- for (ValueVector value : batchLoader) {
+ for (VectorWrapper<?> value : batchLoader) {
if (firstColumn) {
firstColumn = false;
@@ -79,13 +80,13 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
for (int i = 0; i < batchLoader.getRecordCount(); i++) {
boolean first = true;
recordCount++;
- for (ValueVector value : batchLoader) {
+ for (VectorWrapper<?> value : batchLoader) {
if (first) {
first = false;
} else {
System.out.print("\t");
}
- System.out.print(value.getAccessor().getObject(i));
+ System.out.print(value.getValueVector().getAccessor().getObject(i));
}
if(!first) System.out.println();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
new file mode 100644
index 0000000..af5ec06
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
@@ -0,0 +1,145 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import static org.junit.Assert.assertTrue;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.IntVector;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+
+public class TestSimpleSort {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleSort.class);
+ DrillConfig c = DrillConfig.create();
+
+
+ @Test
+ public void sortOneKeyAscending(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable{
+
+
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry("test");
+ bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ }};
+
+
+ PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+ PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/sort/one_key_sort.json"), Charsets.UTF_8));
+ FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+ int previousInt = Integer.MIN_VALUE;
+
+ int recordCount = 0;
+ int batchCount = 0;
+
+ while(exec.next()){
+ batchCount++;
+ IntVector c1 = exec.getValueVectorById(new SchemaPath("blue", ExpressionPosition.UNKNOWN), IntVector.class);
+ IntVector c2 = exec.getValueVectorById(new SchemaPath("green", ExpressionPosition.UNKNOWN), IntVector.class);
+
+ IntVector.Accessor a1 = c1.getAccessor();
+ IntVector.Accessor a2 = c2.getAccessor();
+
+ for(int i =0; i < c1.getAccessor().getValueCount(); i++){
+ recordCount++;
+ assert previousInt <= a1.get(i);
+ previousInt = a1.get(i);
+ assert previousInt == a2.get(i);
+ }
+
+
+ }
+
+ System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
+
+ if(context.getFailureCause() != null){
+ throw context.getFailureCause();
+ }
+ assertTrue(!context.isFailed());
+ }
+
+ @Test
+ public void sortTwoKeysOneAscendingOneDescending(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable{
+
+
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry("test");
+ bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ }};
+
+
+ PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+ PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/sort/two_key_sort.json"), Charsets.UTF_8));
+ FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+ int previousInt = Integer.MIN_VALUE;
+ long previousLong = Long.MAX_VALUE;
+
+ int recordCount = 0;
+ int batchCount = 0;
+
+ while(exec.next()){
+ batchCount++;
+ IntVector c1 = exec.getValueVectorById(new SchemaPath("blue", ExpressionPosition.UNKNOWN), IntVector.class);
+ BigIntVector c2 = exec.getValueVectorById(new SchemaPath("alt", ExpressionPosition.UNKNOWN), BigIntVector.class);
+
+ IntVector.Accessor a1 = c1.getAccessor();
+ BigIntVector.Accessor a2 = c2.getAccessor();
+
+ for(int i =0; i < c1.getAccessor().getValueCount(); i++){
+ recordCount++;
+ assert previousInt <= a1.get(i);
+
+ if(previousInt != a1.get(i)){
+ previousLong = Long.MAX_VALUE;
+ previousInt = a1.get(i);
+ }
+
+ assert previousLong >= a2.get(i);
+
+ //System.out.println(previousInt + "\t" + a2.get(i));
+
+ }
+
+
+ }
+
+ System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
+
+ if(context.getFailureCause() != null){
+ throw context.getFailureCause();
+ }
+ assertTrue(!context.isFailed());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception{
+ // pause to get logger to catch up.
+ Thread.sleep(1000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
index dae5f78..8a1736c 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
@@ -28,7 +28,6 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
import org.junit.Test;
import com.google.common.collect.Lists;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/resources/sort/one_key_sort.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/sort/one_key_sort.json b/sandbox/prototype/exec/java-exec/src/test/resources/sort/one_key_sort.json
new file mode 100644
index 0000000..3bd0b71
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/sort/one_key_sort.json
@@ -0,0 +1,44 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-scan",
+ url: "http://apache.org",
+ entries:[
+ {records: 1000000, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]},
+ {records: 1000000, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id:2,
+ child: 1,
+ pop:"sort",
+ orderings: [
+ {expr: "blue"}
+ ]
+ },
+ {
+ @id:3,
+ child: 2,
+ pop:"selection-vector-remover"
+ },
+ {
+ @id: 4,
+ child: 3,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/resources/sort/two_key_sort.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/sort/two_key_sort.json b/sandbox/prototype/exec/java-exec/src/test/resources/sort/two_key_sort.json
new file mode 100644
index 0000000..2394626
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/sort/two_key_sort.json
@@ -0,0 +1,54 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-scan",
+ url: "http://apache.org",
+ entries:[
+ {records: 100, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]},
+ {records: 100, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id:2,
+ child: 1,
+ pop:"project",
+ exprs: [
+ { ref: "blue", expr:"blue" },
+ { ref: "alt", expr:"alternate3()" }
+ ]
+ },
+ {
+ @id:3,
+ child: 2,
+ pop:"sort",
+ orderings: [
+ {expr: "blue"},
+ {expr: "alt", order: "DESC"}
+ ]
+ },
+ {
+ @id:4,
+ child: 3,
+ pop:"selection-vector-remover"
+ },
+ {
+ @id: 5,
+ child: 4,
+ pop: "screen"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
index b3a7e35..2209b45 100644
--- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java
@@ -2,6 +2,7 @@ package org.apache.drill.sql.client.full;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
@@ -11,7 +12,7 @@ public class BatchListener implements UserResultsListener {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchListener.class);
- private RpcException ex;
+ private volatile RpcException ex;
private volatile boolean completed = false;
final BlockingQueue<QueryResultBatch> queue = new ArrayBlockingQueue<>(100);
@@ -36,11 +37,15 @@ public class BatchListener implements UserResultsListener {
}
public QueryResultBatch getNext() throws RpcException, InterruptedException{
- if(ex != null) throw ex;
- if(completed && queue.isEmpty()){
- return null;
- }else{
- return queue.take();
+ while(true){
+ if(ex != null) throw ex;
+ if(completed && queue.isEmpty()){
+ return null;
+ }else{
+ QueryResultBatch q = queue.poll(50, TimeUnit.MILLISECONDS);
+ if(q != null) return q;
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
index 4eb243d..a60f92b 100644
--- a/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
+++ b/sandbox/prototype/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchLoaderMap.java
@@ -10,6 +10,7 @@ import jline.internal.Preconditions;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.server.DrillbitContext;
@@ -46,8 +47,8 @@ public class BatchLoaderMap implements Map<String, Object> {
boolean schemaChanged = loader.load(batch.getHeader().getDef(), batch.getData());
if (schemaChanged) {
fields.clear();
- for (ValueVector v : loader) {
- fields.put(v.getField().getName(), v);
+ for (VectorWrapper<?> v : loader) {
+ fields.put(v.getField().getName(), v.getValueVector());
}
} else {
logger.debug("Schema didn't change. {}", batch);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
index 281871e..79b8ef8 100644
--- a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
+++ b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/FullEngineTest.java
@@ -3,7 +3,11 @@ package org.apache.drill.jdbc.test;
import java.io.IOException;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
import com.google.common.base.Charsets;
import com.google.common.io.Resources;
@@ -13,6 +17,12 @@ public class FullEngineTest {
private static String MODEL_FULL_ENGINE;
+ // Determine if we are in Eclipse Debug mode.
+ static final boolean IS_DEBUG = java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0;
+
+ // Set a timeout unless we're debugging.
+ @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(10000);
+
@BeforeClass
public static void setupFixtures() throws IOException {
MODEL_FULL_ENGINE = Resources.toString(Resources.getResource("full-model.json"), Charsets.UTF_8);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
index 1900dc3..3bde4ed 100644
--- a/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
+++ b/sandbox/prototype/sqlparser/src/test/java/org/apache/drill/jdbc/test/JdbcTest.java
@@ -25,7 +25,11 @@ import java.sql.Statement;
import org.apache.drill.exec.ref.ReferenceInterpreter;
import org.junit.BeforeClass;
import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
@@ -36,6 +40,7 @@ public class JdbcTest {
private static String MODEL;
private static String EXPECTED;
+
@BeforeClass
public static void setupFixtures() throws IOException {
MODEL = Resources.toString(Resources.getResource("test-models.json"), Charsets.UTF_8);
[2/3] Sort operator using HyperBatch concept. Supports single and
multikey asc/desc sort. One comparator function definition. Add abstract
record batch,
vector container and vector wrapper. Various fixes to return to client when
query fails.
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
new file mode 100644
index 0000000..daf96fc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -0,0 +1,130 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.collect.ArrayListMultimap;
+
+public class SortRecordBatchBuilder {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class);
+
+ private final ArrayListMultimap<BatchSchema, RecordBatchData> batches = ArrayListMultimap.create();
+ private final VectorContainer container;
+
+ private int recordCount;
+ private long runningBytes;
+ private long runningBatches;
+ private final long maxBytes;
+ private SelectionVector4 sv4;
+ final PreAllocator svAllocator;
+
+ public SortRecordBatchBuilder(BufferAllocator a, long maxBytes, VectorContainer container){
+ this.maxBytes = maxBytes;
+ this.svAllocator = a.getPreAllocator();
+ this.container = container;
+ }
+
+ private long getSize(RecordBatch batch){
+ long bytes = 0;
+ for(VectorWrapper<?> v : batch){
+ bytes += v.getValueVector().getBufferSize();
+ }
+ return bytes;
+ }
+
+ /**
+ * Add another record batch to the set of record batches.
+ * @param batch
+ * @return True if the requested add completed successfully. Returns false in the case that this builder is full and cannot receive additional packages.
+ * @throws SchemaChangeException
+ */
+ public boolean add(RecordBatch batch){
+ if(batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
+ if (batch.getRecordCount() == 0) return true; // skip over empty record batches.
+
+ long batchBytes = getSize(batch);
+ if(batchBytes + runningBytes > maxBytes) return false; // enough data memory.
+ if(runningBatches+1 > Character.MAX_VALUE) return false; // allowed in batch.
+ if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available.
+
+
+ RecordBatchData bd = new RecordBatchData(batch);
+ runningBytes += batchBytes;
+ batches.put(batch.getSchema(), bd);
+ recordCount += bd.getRecordCount();
+ return true;
+ }
+
+ public void build(FragmentContext context) throws SchemaChangeException{
+ container.clear();
+ if(batches.keySet().size() > 1) throw new SchemaChangeException("Sort currently only supports a single schema.");
+ if(batches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
+ sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
+ BatchSchema schema = batches.keySet().iterator().next();
+ List<RecordBatchData> data = batches.get(schema);
+
+ // now we're going to generate the sv4 pointers
+ switch(schema.getSelectionVectorMode()){
+ case NONE: {
+ int index = 0;
+ int recordBatchId = 0;
+ for(RecordBatchData d : data){
+ for(int i =0; i < d.getRecordCount(); i++, index++){
+ sv4.set(index, recordBatchId, i);
+ }
+ recordBatchId++;
+ }
+ break;
+ }
+ case TWO_BYTE: {
+ int index = 0;
+ int recordBatchId = 0;
+ for(RecordBatchData d : data){
+ for(int i =0; i < d.getRecordCount(); i++, index++){
+ sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
+ }
+ // might as well drop the selection vector since we'll stop using it now.
+ d.getSv2().clear();
+ recordBatchId++;
+ }
+ break;
+ }
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ // next, we'll create lists of each of the vector types.
+ ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create();
+ for(RecordBatchData rbd : batches.values()){
+ for(ValueVector v : rbd.vectors){
+ vectors.put(v.getField(), v);
+ }
+ }
+
+ for(MaterializedField f : vectors.keySet()){
+ List<ValueVector> v = vectors.get(f);
+ container.addHyperList(v);
+ }
+
+ container.buildSchema(SelectionVectorMode.FOUR_BYTE);
+ }
+
+ public SelectionVector4 getSv4() {
+ return sv4;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
new file mode 100644
index 0000000..c45f500
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
@@ -0,0 +1,45 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+
+public abstract class SortTemplate implements Sorter, IndexedSortable{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortTemplate.class);
+
+ private SelectionVector4 vector4;
+
+
+ public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException{
+ // we pass in the local hyperBatch since that is where we'll be reading data.
+ vector4 = hyperBatch.getSelectionVector4();
+ doSetup(context, hyperBatch, null);
+ }
+
+ @Override
+ public void sort(SelectionVector4 vector4, VectorContainer container){
+ QuickSort qs = new QuickSort();
+ qs.sort(this, 0, vector4.getTotalCount());
+ }
+
+ @Override
+ public void swap(int sv0, int sv1) {
+ int tmp = vector4.get(sv0);
+ vector4.set(sv0, vector4.get(sv1));
+ vector4.set(sv1, tmp);
+ }
+
+ @Override
+ public int compare(int inIndex, int outIndex) {
+ int sv1 = vector4.get(inIndex);
+ int sv2 = vector4.get(outIndex);
+ return doEval(sv1, sv2);
+ }
+
+ public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+ public abstract int doEval(int inIndex, int outIndex);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
new file mode 100644
index 0000000..bc4fae5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
@@ -0,0 +1,17 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public interface Sorter {
+ public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException;
+ public void sort(SelectionVector4 vector4, VectorContainer container);
+
+ public static TemplateClassDefinition<Sorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<Sorter>( //
+ Sorter.class, "org.apache.drill.exec.physical.impl.sort.SortTemplate", Comparator.class, int.class);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index 55d6ba2..ce17a2b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -8,8 +8,11 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
public interface Copier {
- public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION = new TemplateClassDefinition<Copier>( //
- Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate", CopyEvaluator.class, null);
+ public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Copier>( //
+ Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate2", CopyEvaluator.class, null);
+
+ public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Copier>( //
+ Copier.class, "org.apache.drill.exec.physical.impl.svremover.CopierTemplate4", CopyEvaluator.class, null);
public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException;
public abstract void copyRecords();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
deleted file mode 100644
index 6a0e2c3..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.drill.exec.physical.impl.svremover;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-
-public abstract class CopierTemplate implements Copier{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate.class);
-
- private SelectionVector2 sv2;
- private VectorAllocator[] allocators;
-
- @Override
- public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
- this.allocators = allocators;
- this.sv2 = incoming.getSelectionVector2();
- doSetup(context, incoming, outgoing);
- }
-
- private void allocateVectors(int recordCount){
- for(VectorAllocator a : allocators){
- a.alloc(recordCount);
- }
- }
-
- @Override
- public void copyRecords(){
- final int recordCount = sv2.getCount();
- allocateVectors(recordCount);
- int outgoingPosition = 0;
-
- for(int svIndex = 0; svIndex < recordCount; svIndex++, outgoingPosition++){
- doEval(svIndex, outgoingPosition);
- }
- }
-
- public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
- public abstract void doEval(int incoming, int outgoing);
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
new file mode 100644
index 0000000..4dc38f2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
@@ -0,0 +1,43 @@
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+public abstract class CopierTemplate2 implements Copier{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate2.class);
+
+ private SelectionVector2 sv2;
+ private VectorAllocator[] allocators;
+
+ @Override
+ public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
+ this.allocators = allocators;
+ this.sv2 = incoming.getSelectionVector2();
+ doSetup(context, incoming, outgoing);
+ }
+
+ private void allocateVectors(int recordCount){
+ for(VectorAllocator a : allocators){
+ a.alloc(recordCount);
+ }
+ }
+
+ @Override
+ public void copyRecords(){
+ final int recordCount = sv2.getCount();
+ allocateVectors(recordCount);
+ int outgoingPosition = 0;
+
+ for(int svIndex = 0; svIndex < recordCount; svIndex++, outgoingPosition++){
+ doEval(svIndex, outgoingPosition);
+ }
+ }
+
+ public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+ public abstract void doEval(int incoming, int outgoing);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
new file mode 100644
index 0000000..2cf033e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
@@ -0,0 +1,47 @@
+package org.apache.drill.exec.physical.impl.svremover;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.VectorAllocator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public abstract class CopierTemplate4 implements Copier{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class);
+
+ private SelectionVector4 sv4;
+ private VectorAllocator[] allocators;
+
+
+ private void allocateVectors(int recordCount){
+ for(VectorAllocator a : allocators){
+ a.alloc(recordCount);
+ }
+ }
+
+ @Override
+ public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException{
+ this.allocators = allocators;
+ this.sv4 = incoming.getSelectionVector4();
+ doSetup(context, incoming, outgoing);
+ }
+
+
+ @Override
+ public void copyRecords(){
+ final int recordCount = sv4.getLength();
+ allocateVectors(recordCount);
+ int outgoingPosition = 0;
+ final int end = sv4.getStart() + sv4.getLength();
+ for(int svIndex = sv4.getStart(); svIndex < end; svIndex++, outgoingPosition++){
+ int deRefIndex = sv4.get(svIndex);
+ doEval(deRefIndex, outgoingPosition);
+ }
+ }
+
+ public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+ public abstract void doEval(int incoming, int outgoing);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 68793b0..64e89ee 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -1,24 +1,22 @@
package org.apache.drill.exec.physical.impl.svremover;
import java.io.IOException;
-import java.util.Iterator;
import java.util.List;
-import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.VectorHolder;
-import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.TypeHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
@@ -31,36 +29,14 @@ import com.sun.codemodel.JExpression;
import com.sun.codemodel.JType;
import com.sun.codemodel.JVar;
-public class RemovingRecordBatch implements RecordBatch{
+public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class);
- private final RecordBatch incoming;
- private final FragmentContext context;
- private BatchSchema outSchema;
private Copier copier;
- private List<ValueVector> outputVectors;
- private VectorHolder vh;
private int recordCount;
- public RemovingRecordBatch(RecordBatch incoming, FragmentContext context){
- this.incoming = incoming;
- this.context = context;
- }
-
- @Override
- public Iterator<ValueVector> iterator() {
- return outputVectors.iterator();
- }
-
- @Override
- public FragmentContext getContext() {
- return context;
- }
-
- @Override
- public BatchSchema getSchema() {
- Preconditions.checkNotNull(outSchema);
- return outSchema;
+ public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) {
+ super(popConfig, context, incoming);
}
@Override
@@ -69,64 +45,36 @@ public class RemovingRecordBatch implements RecordBatch{
}
@Override
- public void kill() {
- incoming.kill();
- }
-
- @Override
- public SelectionVector2 getSelectionVector2() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public SelectionVector4 getSelectionVector4() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TypedFieldId getValueVectorId(SchemaPath path) {
- return vh.getValueVector(path);
- }
+ protected void setupNewSchema() throws SchemaChangeException {
+ container.clear();
+
+ switch(incoming.getSchema().getSelectionVectorMode()){
+ case NONE:
+ this.copier = getStraightCopier();
+ break;
+ case TWO_BYTE:
+ this.copier = getGenerated2Copier();
+ break;
+ case FOUR_BYTE:
+ this.copier = getGenerated4Copier();
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ container.buildSchema(SelectionVectorMode.NONE);
- @Override
- public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz) {
- return vh.getValueVector(fieldId, clazz);
}
@Override
- public IterOutcome next() {
- recordCount = 0;
- IterOutcome upstream = incoming.next();
- logger.debug("Upstream... {}", upstream);
- switch(upstream){
- case NONE:
- case NOT_YET:
- case STOP:
- return upstream;
- case OK_NEW_SCHEMA:
- try{
- copier = createCopier();
- }catch(SchemaChangeException ex){
- incoming.kill();
- logger.error("Failure during query", ex);
- context.fail(ex);
- return IterOutcome.STOP;
- }
- // fall through.
- case OK:
- recordCount = incoming.getRecordCount();
- copier.copyRecords();
- for(ValueVector v : this.outputVectors){
- ValueVector.Mutator m = v.getMutator();
- m.setValueCount(recordCount);
- }
- return upstream; // change if upstream changed, otherwise normal.
- default:
- throw new UnsupportedOperationException();
+ protected void doWork() {
+ recordCount = incoming.getRecordCount();
+ copier.copyRecords();
+ for(VectorWrapper<?> v : container){
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(recordCount);
}
}
-
-
private class StraightCopier implements Copier{
@@ -136,8 +84,8 @@ public class RemovingRecordBatch implements RecordBatch{
@Override
public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators){
- for(ValueVector vv : incoming){
- TransferPair tp = vv.getTransferPair();
+ for(VectorWrapper<?> vv : incoming){
+ TransferPair tp = vv.getValueVector().getTransferPair();
pairs.add(tp);
out.add(tp.getTo());
}
@@ -159,23 +107,23 @@ public class RemovingRecordBatch implements RecordBatch{
private Copier getStraightCopier(){
StraightCopier copier = new StraightCopier();
copier.setupRemover(context, incoming, this, null);
- outputVectors.addAll(copier.getOut());
+ container.addCollection(copier.getOut());
return copier;
}
- private Copier getGeneratedCopier() throws SchemaChangeException{
+ private Copier getGenerated2Copier() throws SchemaChangeException{
Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
List<VectorAllocator> allocators = Lists.newArrayList();
- for(ValueVector i : incoming){
- TransferPair t = i.getTransferPair();
- outputVectors.add(t.getTo());
- allocators.add(getAllocator(i, t.getTo()));
+ for(VectorWrapper<?> i : incoming){
+ ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+ container.add(v);
+ allocators.add(getAllocator(i.getValueVector(), v));
}
try {
- final CodeGenerator<Copier> cg = new CodeGenerator<Copier>(Copier.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- generateCopies(cg);
+ final CodeGenerator<Copier> cg = new CodeGenerator<Copier>(Copier.TEMPLATE_DEFINITION2, context.getFunctionRegistry());
+ generateCopies(cg, false);
Copier copier = context.getImplementationClass(cg);
copier.setupRemover(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()]));
return copier;
@@ -184,73 +132,77 @@ public class RemovingRecordBatch implements RecordBatch{
}
}
-
- private Copier createCopier() throws SchemaChangeException{
- if(outputVectors != null){
- for(ValueVector v : outputVectors){
- v.close();
- }
- }
- this.outputVectors = Lists.newArrayList();
- this.vh = new VectorHolder(outputVectors);
-
- SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.NONE);
- for(ValueVector v : incoming){
- bldr.addField(v.getField());
- }
- this.outSchema = bldr.build();
+ private Copier getGenerated4Copier() throws SchemaChangeException{
+ Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
- switch(incoming.getSchema().getSelectionVectorMode()){
- case NONE:
- return getStraightCopier();
- case TWO_BYTE:
- return getGeneratedCopier();
- default:
- throw new UnsupportedOperationException();
+ List<VectorAllocator> allocators = Lists.newArrayList();
+ for(VectorWrapper<?> i : incoming){
+
+ ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+ container.add(v);
+ allocators.add(getAllocator4(v));
}
+ try {
+ final CodeGenerator<Copier> cg = new CodeGenerator<Copier>(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry());
+ generateCopies(cg, true);
+ Copier copier = context.getImplementationClass(cg);
+ copier.setupRemover(context, incoming, this, allocators.toArray(new VectorAllocator[allocators.size()]));
+ return copier;
+ } catch (ClassTransformationException | IOException e) {
+ throw new SchemaChangeException("Failure while attempting to load generated class", e);
+ }
}
- private void generateCopies(CodeGenerator<Copier> g){
+ private void generateCopies(CodeGenerator<Copier> g, boolean hyper){
// we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all.
int fieldId = 0;
-
-
JExpression inIndex = JExpr.direct("inIndex");
JExpression outIndex = JExpr.direct("outIndex");
g.rotateBlock();
- for(ValueVector vv : incoming){
- JClass vvClass = (JClass) g.getModel()._ref(vv.getClass());
- JVar inVV = declareVVSetup("incoming", g, fieldId, vvClass);
- JVar outVV = declareVVSetup("outgoing", g, fieldId, vvClass);
+ for(VectorWrapper<?> vv : incoming){
+ JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), fieldId, vv.isHyper()));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false));
+
+ if(hyper){
+
+ g.getBlock().add(
+ outVV
+ .invoke("copyFrom")
+ .arg(
+ inIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+ .arg(outIndex)
+ .arg(
+ inVV.component(inIndex.shrz(JExpr.lit(16)))
+ )
+ );
+ }else{
+ g.getBlock().add(outVV.invoke("copyFrom").arg(inIndex).arg(outIndex).arg(inVV));
+ }
- g.getBlock().add(outVV.invoke("copyFrom").arg(inIndex).arg(outIndex).arg(inVV));
fieldId++;
}
}
- private JVar declareVVSetup(String varName, CodeGenerator<?> g, int fieldId, JClass vvClass){
- JVar vv = g.declareClassField("vv", vvClass);
- JClass t = (JClass) g.getModel()._ref(SchemaChangeException.class);
- JType objClass = g.getModel()._ref(Object.class);
- JBlock b = g.getSetupBlock();
- JVar obj = b.decl( //
- objClass, //
- g.getNextVar("tmp"), //
- JExpr.direct(varName).invoke("getValueVectorById").arg(JExpr.lit(fieldId)).arg( vvClass.dotclass()));
- b._if(obj.eq(JExpr._null()))._then()._throw(JExpr._new(t).arg(JExpr.lit(String.format("Failure while loading vector %s with id %d", vv.name(), fieldId))));
- b.assign(vv, JExpr.cast(vvClass, obj));
-
- return vv;
- }
-
+
@Override
public WritableBatch getWritableBatch() {
return WritableBatch.get(this);
}
+ private VectorAllocator getAllocator4(ValueVector outgoing){
+ if(outgoing instanceof FixedWidthVector){
+ return new FixedVectorAllocator((FixedWidthVector) outgoing);
+ }else if(outgoing instanceof VariableWidthVector ){
+ return new VariableEstimatedVector( (VariableWidthVector) outgoing, 50);
+ }else{
+ throw new UnsupportedOperationException();
+ }
+ }
+
+
private VectorAllocator getAllocator(ValueVector in, ValueVector outgoing){
if(outgoing instanceof FixedWidthVector){
return new FixedVectorAllocator((FixedWidthVector) outgoing);
@@ -273,11 +225,23 @@ public class RemovingRecordBatch implements RecordBatch{
out.allocateNew(recordCount);
out.getMutator().setValueCount(recordCount);
}
-
+ }
+
+ private class VariableEstimatedVector implements VectorAllocator{
+ VariableWidthVector out;
+ int avgWidth;
+ public VariableEstimatedVector(VariableWidthVector out, int avgWidth) {
+ super();
+ this.out = out;
+ this.avgWidth = avgWidth;
+ }
+ public void alloc(int recordCount){
+ out.allocateNew(avgWidth * recordCount, recordCount);
+ out.getMutator().setValueCount(recordCount);
+ }
}
-
private class VariableVectorAllocator implements VectorAllocator{
VariableWidthVector in;
VariableWidthVector out;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
index 4671baa..88708e2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
@@ -16,7 +16,7 @@ public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover>{
@Override
public RecordBatch getBatch(FragmentContext context, SelectionVectorRemover config, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
- return new RemovingRecordBatch(children.iterator().next(), context);
+ return new RemovingRecordBatch(config, context, children.iterator().next());
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
new file mode 100644
index 0000000..a2584b8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -0,0 +1,78 @@
+package org.apache.drill.exec.record;
+
+import java.util.Iterator;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements RecordBatch{
+ final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
+
+ protected final VectorContainer container = new VectorContainer();
+ protected final T popConfig;
+ protected final FragmentContext context;
+
+ protected AbstractRecordBatch(T popConfig, FragmentContext context) {
+ super();
+ this.context = context;
+ this.popConfig = popConfig;
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ return container.iterator();
+ }
+
+ @Override
+ public FragmentContext getContext() {
+ return context;
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ return container.getSchema();
+ }
+
+ @Override
+ public void kill() {
+ container.clear();
+ killIncoming();
+ cleanup();
+ }
+
+ protected abstract void killIncoming();
+
+ protected void cleanup(){
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ return container.getValueVector(path);
+ }
+
+ @Override
+ public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
+ return container.getVectorAccessor(fieldId, clazz);
+ }
+
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ return WritableBatch.get(this);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
new file mode 100644
index 0000000..10cc7ab
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -0,0 +1,52 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSingleRecordBatch.class);
+
+ protected final RecordBatch incoming;
+
+ public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) {
+ super(popConfig, context);
+ this.incoming = incoming;
+ }
+
+ @Override
+ protected void killIncoming() {
+ incoming.kill();
+ }
+
+ @Override
+ public IterOutcome next() {
+ IterOutcome upstream = incoming.next();
+
+ switch(upstream){
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ container.clear();
+ return upstream;
+ case OK_NEW_SCHEMA:
+ try{
+ setupNewSchema();
+ }catch(SchemaChangeException ex){
+ kill();
+ logger.error("Failure during query", ex);
+ context.fail(ex);
+ return IterOutcome.STOP;
+ }
+ // fall through.
+ case OK:
+ doWork();
+ return upstream; // change if upstream changed, otherwise normal.
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ protected abstract void setupNewSchema() throws SchemaChangeException;
+ protected abstract void doWork();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
new file mode 100644
index 0000000..e8a5cf8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -0,0 +1,54 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HyperVectorWrapper.class);
+
+ private T[] vectors;
+ private MaterializedField f;
+
+ public HyperVectorWrapper(MaterializedField f, T[] v){
+ assert(v.length > 0);
+ this.f = f;
+ this.vectors = v;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<T> getVectorClass() {
+ return (Class<T>) vectors.getClass().getComponentType();
+ }
+
+ @Override
+ public MaterializedField getField() {
+ return f;
+ }
+
+ @Override
+ public T getValueVector() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public T[] getValueVectors() {
+ return vectors;
+ }
+
+ @Override
+ public boolean isHyper() {
+ return true;
+ }
+
+ @Override
+ public void release() {
+ for(T x : vectors){
+ x.clear();
+ }
+
+ }
+
+ public static <T extends ValueVector> HyperVectorWrapper<T> create(MaterializedField f, T[] v){
+ return new HyperVectorWrapper<T>(f, v);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
deleted file mode 100644
index d820e0e..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/InvalidValueAccessor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-
-public class InvalidValueAccessor extends ExecutionSetupException{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InvalidValueAccessor.class);
-
- public InvalidValueAccessor() {
- super();
- }
-
- public InvalidValueAccessor(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
- public InvalidValueAccessor(String message, Throwable cause) {
- super(message, cause);
- }
-
- public InvalidValueAccessor(String message) {
- super(message);
- }
-
- public InvalidValueAccessor(Throwable cause) {
- super(cause);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index fc4e759..f747968 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.record;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -32,7 +31,7 @@ import org.apache.drill.exec.vector.ValueVector;
* A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids
* provided utilizing getValueVectorId();
*/
-public interface RecordBatch extends Iterable<ValueVector> {
+public interface RecordBatch extends Iterable<VectorWrapper<?>> {
/**
* Describes the outcome of a RecordBatch being incremented forward.
@@ -93,8 +92,7 @@ public interface RecordBatch extends Iterable<ValueVector> {
* TypedFieldId
*/
public abstract TypedFieldId getValueVectorId(SchemaPath path);
-
- public abstract <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz);
+ public abstract VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz);
/**
* Update the data in each Field reading interface for the next range of records. Once a RecordBatch returns an
@@ -112,43 +110,4 @@ public interface RecordBatch extends Iterable<ValueVector> {
*/
public WritableBatch getWritableBatch();
- public static class TypedFieldId {
- final MajorType type;
- final int fieldId;
-
- public TypedFieldId(MajorType type, int fieldId) {
- super();
- this.type = type;
- this.fieldId = fieldId;
- }
-
- public MajorType getType() {
- return type;
- }
-
- public int getFieldId() {
- return fieldId;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- TypedFieldId other = (TypedFieldId) obj;
- if (fieldId != other.fieldId)
- return false;
- if (type == null) {
- if (other.type != null)
- return false;
- } else if (!type.equals(other.type))
- return false;
- return true;
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index ebacd6e..593c28c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -29,18 +29,17 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
import org.apache.drill.exec.vector.TypeHelper;
import org.apache.drill.exec.vector.ValueVector;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-public class RecordBatchLoader implements Iterable<ValueVector>{
+public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);
- private List<ValueVector> vectors = Lists.newArrayList();
+ private VectorContainer container = new VectorContainer();
private final BufferAllocator allocator;
private int recordCount;
private BatchSchema schema;
@@ -66,11 +65,12 @@ public class RecordBatchLoader implements Iterable<ValueVector>{
boolean schemaChanged = false;
Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
- for(ValueVector v : this.vectors){
+ for(VectorWrapper<?> w : container){
+ ValueVector v = w.getValueVector();
oldFields.put(v.getField(), v);
}
- List<ValueVector> newVectors = Lists.newArrayList();
+ VectorContainer newVectors = new VectorContainer();
List<FieldMetadata> fields = def.getFieldList();
@@ -79,7 +79,7 @@ public class RecordBatchLoader implements Iterable<ValueVector>{
FieldDef fieldDef = fmd.getDef();
ValueVector v = oldFields.remove(fieldDef);
if(v != null){
- newVectors.add(v);
+ container.add(v);
continue;
}
@@ -101,49 +101,51 @@ public class RecordBatchLoader implements Iterable<ValueVector>{
// rebuild the schema.
SchemaBuilder b = BatchSchema.newBuilder();
- for(ValueVector v : newVectors){
+ for(VectorWrapper<?> v : newVectors){
b.addField(v.getField());
}
b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
this.schema = b.build();
- vectors = ImmutableList.copyOf(newVectors);
+ container = newVectors;
return schemaChanged;
}
- public TypedFieldId getValueVector(SchemaPath path) {
- for(int i =0; i < vectors.size(); i++){
- ValueVector vv = vectors.get(i);
- if(vv.getField().matches(path)) return new TypedFieldId(vv.getField().getType(), i);
- }
- return null;
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ return container.getValueVector(path);
}
- @SuppressWarnings("unchecked")
- public <T extends ValueVector> T getValueVector(int fieldId, Class<?> clazz) {
- ValueVector v = vectors.get(fieldId);
- assert v != null;
- if (v.getClass() != clazz){
- logger.warn(String.format(
- "Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
- clazz.getCanonicalName(), v.getClass().getCanonicalName()));
- return null;
- }
- return (T) v;
- }
+
+
+//
+// @SuppressWarnings("unchecked")
+// public <T extends ValueVector> T getValueVector(int fieldId, Class<?> clazz) {
+// ValueVector v = container.get(fieldId);
+// assert v != null;
+// if (v.getClass() != clazz){
+// logger.warn(String.format(
+// "Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
+// clazz.getCanonicalName(), v.getClass().getCanonicalName()));
+// return null;
+// }
+// return (T) v;
+// }
public int getRecordCount() {
return recordCount;
}
-
+ public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
+ return container.getVectorAccessor(fieldId, clazz);
+ }
+
public WritableBatch getWritableBatch(){
- return WritableBatch.getBatchNoSV(recordCount, vectors);
+ return WritableBatch.getBatchNoSVWrap(recordCount, container);
}
@Override
- public Iterator<ValueVector> iterator() {
- return this.vectors.iterator();
+ public Iterator<VectorWrapper<?>> iterator() {
+ return this.container.iterator();
}
public BatchSchema getSchema(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java
deleted file mode 100644
index 9bc6e5f..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordMaker.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record;
-
-public class RecordMaker {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordMaker.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java
deleted file mode 100644
index 86c963d..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordRemapper.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.apache.drill.exec.record;
-
-/**
- * Remove the selection vector from a record batch.
- */
-public class RecordRemapper {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordRemapper.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
new file mode 100644
index 0000000..94700a2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -0,0 +1,49 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleVectorWrapper.class);
+
+ private T v;
+
+ public SimpleVectorWrapper(T v){
+ this.v = v;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<T> getVectorClass() {
+ return (Class<T>) v.getClass();
+ }
+
+ @Override
+ public MaterializedField getField() {
+ return v.getField();
+ }
+
+ @Override
+ public T getValueVector() {
+ return v;
+ }
+
+ @Override
+ public T[] getValueVectors() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isHyper() {
+ return false;
+ }
+
+
+ @Override
+ public void release() {
+ v.clear();
+ }
+
+ public static <T extends ValueVector> SimpleVectorWrapper<T> create(T v){
+ return new SimpleVectorWrapper<T>(v);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
new file mode 100644
index 0000000..3905fa8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
@@ -0,0 +1,58 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+
+public class TypedFieldId {
+ final MajorType type;
+ final int fieldId;
+ final boolean isHyperReader;
+
+ public TypedFieldId(MajorType type, int fieldId){
+ this(type, fieldId, false);
+ }
+
+ public TypedFieldId(MajorType type, int fieldId, boolean isHyper) {
+ super();
+ this.type = type;
+ this.fieldId = fieldId;
+ this.isHyperReader = isHyper;
+ }
+
+ public boolean isHyperReader(){
+ return isHyperReader;
+ }
+
+ public MajorType getType() {
+ return type;
+ }
+
+ public int getFieldId() {
+ return fieldId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TypedFieldId other = (TypedFieldId) obj;
+ if (fieldId != other.fieldId)
+ return false;
+ if (type == null) {
+ if (other.type != null)
+ return false;
+ } else if (!type.equals(other.type))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "TypedFieldId [type=" + type + ", fieldId=" + fieldId + ", isSuperReader=" + isHyperReader + "]";
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
new file mode 100644
index 0000000..923fbd5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -0,0 +1,130 @@
+package org.apache.drill.exec.record;
+
+import java.lang.reflect.Array;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Preconditions;
+
+public class VectorContainer implements Iterable<VectorWrapper<?>> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainer.class);
+
+ private final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
+ private BatchSchema schema;
+
+ public VectorContainer() {
+ }
+
+ public VectorContainer(List<ValueVector> vectors, List<ValueVector[]> hyperVectors) {
+ assert !vectors.isEmpty() || !hyperVectors.isEmpty();
+
+ addCollection(vectors);
+
+ for (ValueVector[] vArr : hyperVectors) {
+ add(vArr);
+ }
+ }
+
+ public void addHyperList(List<ValueVector> vectors){
+ schema = null;
+ ValueVector[] vv = new ValueVector[vectors.size()];
+ for(int i =0; i < vv.length; i++){
+ vv[i] = vectors.get(i);
+ }
+ add(vv);
+ }
+
+ public void addCollection(Iterable<ValueVector> vectors) {
+ schema = null;
+ for (ValueVector vv : vectors) {
+ wrappers.add(SimpleVectorWrapper.create(vv));
+ }
+ }
+
+ public TypedFieldId add(ValueVector vv) {
+ schema = null;
+ int i = wrappers.size();
+ wrappers.add(SimpleVectorWrapper.create(vv));
+ return new TypedFieldId(vv.getField().getType(), i, false);
+ }
+
+ public void add(ValueVector[] hyperVector) {
+ assert hyperVector.length != 0;
+ schema = null;
+ Class<?> clazz = hyperVector[0].getClass();
+ ValueVector[] c = (ValueVector[]) Array.newInstance(clazz, hyperVector.length);
+ for (int i = 0; i < hyperVector.length; i++) {
+ c[i] = hyperVector[i];
+ }
+ // todo: work with a merged schema.
+ wrappers.add(HyperVectorWrapper.create(hyperVector[0].getField(), c));
+ }
+
+ public void remove(ValueVector v) {
+ schema = null;
+ for (Iterator<VectorWrapper<?>> iter = wrappers.iterator(); iter.hasNext();) {
+ VectorWrapper<?> w = iter.next();
+ if (!w.isHyper() && v == w.getValueVector()) {
+ iter.remove();
+ return;
+ }
+ }
+
+ throw new IllegalStateException("You attempted to remove a vector that didn't exist.");
+ }
+
+ public TypedFieldId getValueVector(SchemaPath path) {
+ for (int i = 0; i < wrappers.size(); i++) {
+ VectorWrapper<?> va = wrappers.get(i);
+ if (va.getField().matches(path))
+ return new TypedFieldId(va.getField().getType(), i, va.isHyper());
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends ValueVector> VectorWrapper<T> getVectorAccessor(int fieldId, Class<?> clazz) {
+ VectorWrapper<?> va = wrappers.get(fieldId);
+ assert va != null;
+ if (va.getVectorClass() != clazz) {
+ logger.warn(String.format(
+ "Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
+ clazz.getCanonicalName(), va.getVectorClass().getCanonicalName()));
+ return null;
+ }
+ return (VectorWrapper<T>) va;
+ }
+
+ public BatchSchema getSchema(){
+ Preconditions.checkNotNull(schema, "Schema is currently null. You must call buildSchema(SelectionVectorMode) before this container can return a schema.");
+ return schema;
+ }
+
+ public void buildSchema(SelectionVectorMode mode) {
+ SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(mode);
+ for (VectorWrapper<?> v : wrappers) {
+ bldr.addField(v.getField());
+ }
+ this.schema = bldr.build();
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ return wrappers.iterator();
+ }
+
+ public void clear() {
+ // TODO: figure out a better approach for this.
+ // we don't clear schema because we want empty batches to carry previous schema to avoid extra schema update for no data.
+ // schema = null;
+ for (VectorWrapper<?> w : wrappers) {
+ w.release();
+ }
+ wrappers.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
new file mode 100644
index 0000000..e40dee4
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -0,0 +1,14 @@
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+
+public interface VectorWrapper<T extends ValueVector> {
+
+ public Class<T> getVectorClass();
+ public MaterializedField getField();
+ public T getValueVector();
+ public T[] getValueVectors();
+ public boolean isHyper();
+ public void release();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 685cc77..e84bf37 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.vector.ValueVector;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
@@ -57,6 +58,15 @@ public class WritableBatch {
return buffers;
}
+ public static WritableBatch getBatchNoSVWrap(int recordCount, Iterable<VectorWrapper<?>> vws) {
+ List<ValueVector> vectors = Lists.newArrayList();
+ for(VectorWrapper<?> vw : vws){
+ Preconditions.checkArgument(!vw.isHyper());
+ vectors.add(vw.getValueVector());
+ }
+ return getBatchNoSV(recordCount, vectors);
+ }
+
public static WritableBatch getBatchNoSV(int recordCount, Iterable<ValueVector> vectors) {
List<ByteBuf> buffers = Lists.newArrayList();
List<FieldMetadata> metadata = Lists.newArrayList();
@@ -83,7 +93,7 @@ public class WritableBatch {
public static WritableBatch get(RecordBatch batch) {
if(batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() != SelectionVectorMode.NONE) throw new UnsupportedOperationException("Only batches without selections vectors are writable.");
- return getBatchNoSV(batch.getRecordCount(), batch);
+ return getBatchNoSVWrap(batch.getRecordCount(), batch);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index 1f3874f..2020f92 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -20,22 +20,58 @@ package org.apache.drill.exec.record.selection;
import io.netty.buffer.ByteBuf;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.DeadBuf;
+import org.apache.drill.exec.exception.SchemaChangeException;
public class SelectionVector4 {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4.class);
- private final BufferAllocator allocator;
- private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
-
- public SelectionVector4(BufferAllocator allocator) {
- this.allocator = allocator;
+ private final ByteBuf vector;
+ private final int recordCount;
+ private int start;
+ private int length;
+
+ public SelectionVector4(ByteBuf vector, int recordCount, int batchRecordCount) throws SchemaChangeException {
+ if(recordCount > Integer.MAX_VALUE /4) throw new SchemaChangeException(String.format("Currently, Drill can only support allocations up to 2gb in size. You requested an allocation of %d bytes.", recordCount * 4));
+ this.recordCount = recordCount;
+ this.start = 0;
+ this.length = Math.min(batchRecordCount, recordCount);
+ this.vector = vector;
}
-
- public int getCount(){
- return -1;
+
+ public int getTotalCount(){
+ return recordCount;
+ }
+
+ public int getCurrentCount(){
+ return length;
+ }
+
+ public void set(int index, int compound){
+ vector.setInt(index*4, compound);
+ }
+ public void set(int index, int recordBatch, int recordIndex){
+ vector.setInt(index*4, (recordBatch << 16) | (recordIndex & 65535));
+ }
+
+ public int get(int index){
+ return vector.getInt(index*4);
}
+ public int getStart() {
+ return start;
+ }
+ public int getLength() {
+ return length;
+ }
+
+ public boolean next(){
+ if(start + length == recordCount) return false;
+ start = start+length;
+ int newEnd = Math.min(start+length, recordCount);
+ length = newEnd - start;
+ return true;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java
new file mode 100644
index 0000000..722f4d5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4Builder.java
@@ -0,0 +1,37 @@
+package org.apache.drill.exec.record.selection;
+
+import java.util.List;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.collect.Lists;
+
+public class SelectionVector4Builder {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4Builder.class);
+
+ private List<BatchSchema> schemas = Lists.newArrayList();
+
+ public void add(RecordBatch batch, boolean newSchema) throws SchemaChangeException{
+ if(!schemas.isEmpty() && newSchema) throw new SchemaChangeException("Currently, the sv4 builder doesn't support embedded types");
+ if(newSchema){
+ schemas.add(batch.getSchema());
+ }
+
+ }
+
+
+ // deals with managing selection vectors.
+ // take a four byte int
+ /**
+ * take a four byte value
+ * use the first two as a pointer. use the other two as a
+ *
+ * we should manage an array of valuevectors
+ */
+
+ private class VectorSchemaBuilder{
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 36fd199..b2283a2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcException;
@@ -54,29 +55,40 @@ public class QueryResultHandler {
final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
final QueryResultBatch batch = new QueryResultBatch(result, dBody);
UserResultsListener l = resultsListener.get(result.getQueryId());
+
+ boolean failed = batch.getHeader().getQueryState() == QueryState.FAILED;
// logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l);
- if (l != null) {
- // logger.debug("Results listener available, using existing.");
- l.resultArrived(batch);
- if (result.getIsLastChunk()) {
- resultsListener.remove(result.getQueryId(), l);
- }
- } else {
- logger.debug("Results listener not available, creating a buffering listener.");
- // manage race condition where we start getting results before we receive the queryid back.
+ if (l == null) {
BufferingListener bl = new BufferingListener();
l = resultsListener.putIfAbsent(result.getQueryId(), bl);
- if (l != null) {
- l.resultArrived(batch);
- } else {
- bl.resultArrived(batch);
- }
+ // if we had a succesful insert, use that reference. Otherwise, just throw away the new bufering listener.
+ if (l == null) l = bl;
}
+
+ if(failed){
+ l.submissionFailed(new RpcException("Remote failure while running query." + batch.getHeader().getErrorList()));
+ resultsListener.remove(result.getQueryId(), l);
+ }else{
+ l.resultArrived(batch);
+ }
+
+ if (
+ (failed || result.getIsLastChunk())
+ &&
+ (!(l instanceof BufferingListener) || ((BufferingListener)l).output != null)
+ ) {
+ resultsListener.remove(result.getQueryId(), l);
+ }
+
}
+
+
private class BufferingListener implements UserResultsListener {
private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
+ private volatile boolean finished = false;
+ private volatile RpcException ex;
private volatile UserResultsListener output;
public boolean transferTo(UserResultsListener l) {
@@ -87,12 +99,19 @@ public class QueryResultHandler {
l.resultArrived(r);
last = r.getHeader().getIsLastChunk();
}
+ if(ex != null){
+ l.submissionFailed(ex);
+ return true;
+ }
return last;
}
}
+
@Override
public void resultArrived(QueryResultBatch result) {
+ if(result.getHeader().getIsLastChunk()) finished = true;
+
synchronized (this) {
if (output == null) {
this.results.add(result);
@@ -104,7 +123,18 @@ public class QueryResultHandler {
@Override
public void submissionFailed(RpcException ex) {
- throw new UnsupportedOperationException("You cannot report failed submissions to a buffering listener.");
+ finished = true;
+ synchronized (this) {
+ if (output == null){
+ this.ex = ex;
+ } else{
+ output.submissionFailed(ex);
+ }
+ }
+ }
+
+ public boolean isFinished(){
+ return finished;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index ad54a07..1b1e39a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -28,6 +28,7 @@ abstract class BaseDataValueVector extends BaseValueVector{
valueCount = 0;
}
}
+
@Override
public ByteBuf[] getBuffers(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java
new file mode 100644
index 0000000..d43c38e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SerializableVector.java
@@ -0,0 +1,7 @@
+package org.apache.drill.exec.vector;
+
+
+public interface SerializableVector extends ValueVector{
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 717c087..dfe8e8c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -32,6 +32,8 @@ import org.apache.drill.exec.record.TransferPair;
*/
public interface ValueVector extends Closeable {
+ public int getBufferSize();
+
/**
* Alternative to clear(). Allows use as closeable in try-with-resources.
*/
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
index 7668bdc..9fd33b9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
import org.apache.drill.exec.work.foreman.ErrorHelper;
-public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
+public abstract class AbstractFragmentRunnerListener implements FragmentRunnerListener{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractFragmentRunnerListener.class);
private FragmentContext context;
@@ -90,9 +90,9 @@ public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
statusChange(handle, statusBuilder.build());
}
- protected void statusChange(FragmentHandle handle, FragmentStatus status){
+ protected abstract void statusChange(FragmentHandle handle, FragmentStatus status);
- }
+
@Override
public final void fail(FragmentHandle handle, String message, Throwable excep) {
@@ -103,7 +103,6 @@ public class AbstractFragmentRunnerListener implements FragmentRunnerListener{
protected void fail(FragmentHandle handle, FragmentStatus.Builder statusBuilder){
statusChange(handle, statusBuilder.build());
- // TODO: ensure the foreman handles the exception
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
index 74fcd2b..ef7bcb1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
@@ -38,8 +38,10 @@ public class RemotingFragmentRunnerListener extends AbstractFragmentRunnerListen
this.tunnel = tunnel;
}
+
@Override
protected void statusChange(FragmentHandle handle, FragmentStatus status) {
+ logger.debug("Sending remote failure.");
tunnel.sendFragmentStatus(status);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
index d4c4014..9a33109 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
@@ -31,11 +31,24 @@ public class ErrorHelper {
DrillPBError.Builder builder = DrillPBError.newBuilder();
builder.setEndpoint(endpoint);
builder.setErrorId(id);
+ StringBuilder sb = new StringBuilder();
if(message != null){
- builder.setMessage(message);
- }else{
- builder.setMessage(t.getMessage());
+ sb.append(message);
}
+
+ do{
+ sb.append(" < ");
+ sb.append(t.getClass().getSimpleName());
+ if(t.getMessage() != null){
+ sb.append(":[ ");
+ sb.append(t.getMessage());
+ sb.append(" ]");
+ }
+ }while(t.getCause() != null && t.getCause() != t);
+
+ builder.setMessage(sb.toString());
+
+
builder.setErrorType(0);
// record the error to the log for later reference.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
index 65fc8c7..af91a6b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos.QueryResult;
import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
import org.apache.drill.exec.rpc.RpcException;
@@ -61,6 +62,7 @@ class RunningFragmentManager implements FragmentStatusListener{
private ForemanManagerListener foreman;
private AtomicInteger remainingFragmentCount;
private FragmentRunner rootRunner;
+ private volatile QueryId queryId;
public RunningFragmentManager(ForemanManagerListener foreman, TunnelManager tun) {
super();
@@ -72,6 +74,7 @@ class RunningFragmentManager implements FragmentStatusListener{
public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments) throws ExecutionSetupException{
remainingFragmentCount.set(leafFragments.size()+1);
+ queryId = rootFragment.getHandle().getQueryId();
// set up the root fragment first so we'll have incoming buffers available.
{
@@ -146,7 +149,7 @@ class RunningFragmentManager implements FragmentStatusListener{
private void fail(FragmentStatus status){
updateStatus(status);
stopQuery();
- QueryResult result = QueryResult.newBuilder().setQueryState(QueryState.FAILED).build();
+ QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(status.getError()).build();
foreman.cleanupAndSendResult(result);
}
@@ -262,7 +265,7 @@ class RunningFragmentManager implements FragmentStatusListener{
@Override
protected void statusChange(FragmentHandle handle, FragmentStatus status) {
- RunningFragmentManager.this.updateStatus(status);
+ RunningFragmentManager.this.statusUpdate(status);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java
new file mode 100644
index 0000000..a41a883
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/expr/EscapeTest1.java
@@ -0,0 +1,181 @@
+package org.apache.drill.exec.expr;
+
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+
+public final class EscapeTest1 {
+
+ public static class Timer{
+ long n1;
+ String name;
+ public Timer(String name){
+ this.n1 = System.nanoTime();
+ this.name = name;
+ }
+
+ public void print(long sum){
+ System.out.println(String.format("Completed %s in %d ms. Output was %d", name, (System.nanoTime() - n1)/1000/1000, sum));
+ }
+ }
+
+ public static Timer time(String name){
+ return new Timer(name);
+ }
+
+ public static void main(String args[]){
+ EscapeTest1 et = new EscapeTest1();
+ Monkey m = new Monkey();
+ for(int i =0; i < 10; i++){
+ time("noalloc").print(et.noAlloc());
+ time("alloc").print(et.alloc());
+ time("set noalloc").print(et.setNoAlloc(m));
+ time("set alloc").print(et.setAlloc(m));
+ time("get noalloc").print(et.getNoAlloc(m));
+ time("get alloc").print(et.getAlloc(m));
+ time("get return alloc").print(et.getReturnAlloc(m));
+ }
+ }
+
+ public long noAlloc(){
+ long sum = 0;
+ for(int i =0; i < 1000000000; i++){
+ sum+= add(i+1, i+2);
+ }
+ return sum;
+ }
+
+ public long alloc(){
+ long sum = 0;
+ for(int i =0; i < 1000000000; i++){
+ Ad ad = new Ad(i+1, i+2);
+ sum+= add(ad.x, ad.y);
+ }
+ return sum;
+ }
+
+ public long setAlloc(Monkey m){
+ long sum = 0;
+ for(int i =0; i < 490000000; i++){
+ EH h = new EH(i+1, i+2);
+ m.set(h);
+ sum += i;
+ }
+ return sum;
+ }
+
+ public long setNoAlloc(Monkey m){
+ long sum = 0;
+ for(int i =0; i < 490000000; i++){
+ m.set(i+1, i+2);
+ sum += i;
+ }
+ return sum;
+ }
+
+ public long getAlloc(Monkey m){
+ long sum = 0;
+ for(int i =0; i < 490000000; i++){
+ RR r = new RR();
+ m.get(i, i+1, r);
+ sum += r.v1 + r.v2;
+ }
+ return sum;
+ }
+
+ public long getNoAlloc(Monkey m){
+ long sum = 0;
+ for(int i =0; i < 490000000; i++){
+ int i1 = m.getV1(i);
+ int i2 = m.getV2(i+1);
+ sum += i1 + i2;
+ }
+ return sum;
+ }
+
+ public long getReturnAlloc(Monkey m){
+ long sum = 0;
+ for(int i =0; i < 490000000; i++){
+ RR r = m.get(i, i+1);
+ sum += r.v1 + r.v2;
+ }
+ return sum;
+ }
+
+
+ public class Ad{
+ long x;
+ long y;
+ public Ad(long x, long y) {
+ super();
+ this.x = x;
+ this.y = y;
+ }
+ }
+
+
+ public static final class EH{
+ int index;
+ int value;
+ public EH(int index, int value) {
+ super();
+ this.index = index;
+ this.value = value;
+ }
+ }
+
+ public static final class RR{
+ int v1;
+ int v2;
+
+ public RR(){
+
+ }
+ public RR(int v1, int v2) {
+ super();
+ this.v1 = v1;
+ this.v2 = v2;
+ }
+ }
+
+ public long add(long a, long b){
+ return a + b;
+ }
+
+
+ public final static class Monkey{
+ final IntBuffer buf;
+
+ public Monkey(){
+ ByteBuffer bb = ByteBuffer.allocateDirect(Integer.MAX_VALUE);
+ buf = bb.asIntBuffer();
+ }
+
+ public final void set(int index, int value){
+ buf.put(index, value);
+ }
+
+ public final void set(EH a){
+ buf.put(a.index, a.value);
+ }
+
+ public final int getV1(int index){
+ return buf.get(index);
+ }
+
+ public final int getV2(int index){
+ return buf.get(index);
+ }
+
+ public final RR get(int index1, int index2){
+ return new RR(buf.get(index1), buf.get(index2));
+ }
+
+ public final void get(int index1, int index2, RR rr){
+ rr.v1 = buf.get(index1);
+ rr.v2 = buf.get(index2);
+ }
+
+ }
+
+
+}
[3/3] git commit: Sort operator using HyperBatch concept. Supports
single and multikey asc/desc sort. One comparator function definition. Add
abstract record batch,
vector container and vector wrapper. Various fixes to return to client when
query fails.
Posted by ja...@apache.org.
Sort operator using HyperBatch concept. Supports single and multikey asc/desc sort.
One comparator function definition.
Add abstract record batch, vector container and vector wrapper.
Various fixes to return to client when query fails.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/db3afaa8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/db3afaa8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/db3afaa8
Branch: refs/heads/master
Commit: db3afaa854fc8475592907dba97162ecf869f9df
Parents: 31f1964
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Aug 4 15:47:31 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Aug 6 14:37:27 2013 -0700
----------------------------------------------------------------------
.../drill/common/expression/FunctionCall.java | 1 +
.../templates/NullableValueVectors.java | 2 +-
.../templates/RepeatedValueVectors.java | 2 +-
.../templates/VariableLengthVectors.java | 5 +-
.../apache/drill/exec/expr/CodeGenerator.java | 39 ++-
.../drill/exec/expr/EvaluationVisitor.java | 62 ++---
.../exec/expr/ExpressionTreeMaterializer.java | 2 +-
.../exec/expr/ValueVectorReadExpression.java | 33 ++-
.../exec/expr/ValueVectorWriteExpression.java | 13 +-
.../drill/exec/expr/fn/impl/Alternator.java | 53 ++--
.../exec/expr/fn/impl/ComparatorFunctions.java | 58 +++++
.../apache/drill/exec/ops/FragmentContext.java | 5 +-
.../apache/drill/exec/physical/base/Sender.java | 1 +
.../exec/physical/impl/FilterRecordBatch.java | 130 ----------
.../impl/FilteringRecordBatchTransformer.java | 58 -----
.../drill/exec/physical/impl/ImplCreator.java | 9 +
.../drill/exec/physical/impl/RootExec.java | 1 -
.../drill/exec/physical/impl/ScanBatch.java | 26 +-
.../drill/exec/physical/impl/ScreenCreator.java | 4 +-
.../exec/physical/impl/SingleSenderCreator.java | 2 +-
.../drill/exec/physical/impl/VectorHolder.java | 39 ---
.../exec/physical/impl/WireRecordBatch.java | 15 +-
.../physical/impl/filter/FilterRecordBatch.java | 129 ++--------
.../impl/materialize/RecordMaterializer.java | 6 -
.../impl/project/ProjectRecordBatch.java | 141 ++---------
.../exec/physical/impl/sort/Comparator.java | 11 +
.../physical/impl/sort/ReadIndexRewriter.java | 87 +++++++
.../physical/impl/sort/RecordBatchData.java | 47 ++++
.../exec/physical/impl/sort/SortBatch.java | 171 +++++++++++++
.../physical/impl/sort/SortBatchCreator.java | 23 ++
.../impl/sort/SortRecordBatchBuilder.java | 130 ++++++++++
.../exec/physical/impl/sort/SortTemplate.java | 45 ++++
.../drill/exec/physical/impl/sort/Sorter.java | 17 ++
.../exec/physical/impl/svremover/Copier.java | 7 +-
.../physical/impl/svremover/CopierTemplate.java | 43 ----
.../impl/svremover/CopierTemplate2.java | 43 ++++
.../impl/svremover/CopierTemplate4.java | 47 ++++
.../impl/svremover/RemovingRecordBatch.java | 246 ++++++++-----------
.../impl/svremover/SVRemoverCreator.java | 2 +-
.../drill/exec/record/AbstractRecordBatch.java | 78 ++++++
.../exec/record/AbstractSingleRecordBatch.java | 52 ++++
.../drill/exec/record/HyperVectorWrapper.java | 54 ++++
.../drill/exec/record/InvalidValueAccessor.java | 46 ----
.../apache/drill/exec/record/RecordBatch.java | 45 +---
.../drill/exec/record/RecordBatchLoader.java | 66 ++---
.../apache/drill/exec/record/RecordMaker.java | 22 --
.../drill/exec/record/RecordRemapper.java | 8 -
.../drill/exec/record/SimpleVectorWrapper.java | 49 ++++
.../apache/drill/exec/record/TypedFieldId.java | 58 +++++
.../drill/exec/record/VectorContainer.java | 130 ++++++++++
.../apache/drill/exec/record/VectorWrapper.java | 14 ++
.../apache/drill/exec/record/WritableBatch.java | 12 +-
.../exec/record/selection/SelectionVector4.java | 56 ++++-
.../selection/SelectionVector4Builder.java | 37 +++
.../drill/exec/rpc/user/QueryResultHandler.java | 60 +++--
.../drill/exec/vector/BaseDataValueVector.java | 1 +
.../drill/exec/vector/SerializableVector.java | 7 +
.../apache/drill/exec/vector/ValueVector.java | 2 +
.../work/AbstractFragmentRunnerListener.java | 7 +-
.../work/RemotingFragmentRunnerListener.java | 2 +
.../drill/exec/work/foreman/ErrorHelper.java | 19 +-
.../work/foreman/RunningFragmentManager.java | 7 +-
.../org/apache/drill/exec/expr/EscapeTest1.java | 181 ++++++++++++++
.../apache/drill/exec/expr/ExpressionTest.java | 22 +-
.../exec/physical/impl/SimpleRootExec.java | 16 +-
.../physical/impl/TestSimpleFragmentRun.java | 11 +-
.../exec/physical/impl/sort/TestSimpleSort.java | 145 +++++++++++
.../record/ExpressionTreeMaterializerTest.java | 1 -
.../src/test/resources/sort/one_key_sort.json | 44 ++++
.../src/test/resources/sort/two_key_sort.json | 54 ++++
.../drill/sql/client/full/BatchListener.java | 17 +-
.../drill/sql/client/full/BatchLoaderMap.java | 5 +-
.../apache/drill/jdbc/test/FullEngineTest.java | 10 +
.../org/apache/drill/jdbc/test/JdbcTest.java | 5 +
74 files changed, 2150 insertions(+), 948 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionCall.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionCall.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionCall.java
index d27b584..d343fd6 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionCall.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionCall.java
@@ -76,4 +76,5 @@ public class FunctionCall extends LogicalExpressionBase implements Iterable<Logi
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
index 644019e..ca222df 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
@@ -68,7 +68,7 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
values.clear();
}
- int getBufferSize(){
+ public int getBufferSize(){
return values.getBufferSize() + bits.getBufferSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
index 26564c1..79f05b6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
@@ -59,7 +59,7 @@ import com.google.common.collect.Lists;
return values.getValueCapacity();
}
- int getBufferSize(){
+ public int getBufferSize(){
return offsets.getBufferSize() + values.getBufferSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
index 95965f7..4492aa9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
@@ -49,7 +49,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
this.offsetVector = new UInt${type.width}Vector(null, allocator);
}
-
+ public int getBufferSize(){
+ return offsetVector.getBufferSize() + data.writerIndex();
+ }
+
int getSizeFromCount(int valueCount) {
return valueCount * ${type.width};
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
index 67ca614..9895782 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/CodeGenerator.java
@@ -12,14 +12,18 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.TypeHelper;
import com.google.common.base.Preconditions;
import com.sun.codemodel.JBlock;
+import com.sun.codemodel.JClass;
import com.sun.codemodel.JClassAlreadyExistsException;
import com.sun.codemodel.JCodeModel;
import com.sun.codemodel.JDefinedClass;
import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
import com.sun.codemodel.JFieldRef;
import com.sun.codemodel.JMethod;
import com.sun.codemodel.JMod;
@@ -61,11 +65,42 @@ public class CodeGenerator<T> {
}
+ public JVar declareVectorValueSetupAndMember(String batchName, TypedFieldId fieldId){
+ Class<?> valueVectorClass = TypeHelper.getValueVectorClass(fieldId.getType().getMinorType(), fieldId.getType().getMode());
+ JClass vvClass = model.ref(valueVectorClass);
+ JClass retClass = vvClass;
+ String vectorAccess = "getValueVector";
+ if(fieldId.isHyperReader()){
+ retClass = retClass.array();
+ vectorAccess = "getValueVectors";
+ }
+
+ JVar vv = declareClassField("vv", retClass);
+ JClass t = model.ref(SchemaChangeException.class);
+ JType wrapperClass = model.ref(VectorWrapper.class);
+ JType objClass = model.ref(Object.class);
+ JBlock b = getSetupBlock();
+ JVar obj = b.decl( //
+ objClass, //
+ getNextVar("tmp"), //
+ JExpr.direct(batchName)
+ .invoke("getValueAccessorById") //
+ .arg(JExpr.lit(fieldId.getFieldId())) //
+ .arg( vvClass.dotclass())
+ .invoke(vectorAccess)//
+ );
+
+ b._if(obj.eq(JExpr._null()))._then()._throw(JExpr._new(t).arg(JExpr.lit(String.format("Failure while loading vector %s with id: %s.", vv.name(), fieldId.toString()))));
+ //b.assign(vv, JExpr.cast(retClass, ((JExpression) JExpr.cast(wrapperClass, obj) ).invoke(vectorAccess)));
+ b.assign(vv, JExpr.cast(retClass, obj ));
+
+ return vv;
+ }
- public void addExpr(LogicalExpression ex){
+ public HoldingContainer addExpr(LogicalExpression ex){
logger.debug("Adding next write {}", ex);
rotateBlock();
- ex.accept(evaluationVisitor, this);
+ return ex.accept(evaluationVisitor, this);
}
public void rotateBlock(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 03a2a6b..93d5b5b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -1,7 +1,5 @@
package org.apache.drill.exec.expr;
-import com.google.common.base.Charsets;
-import com.sun.codemodel.*;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.IfExpression;
import org.apache.drill.common.expression.IfExpression.IfCondition;
@@ -13,15 +11,17 @@ import org.apache.drill.common.expression.ValueExpressions.LongExpression;
import org.apache.drill.common.expression.ValueExpressions.QuotedString;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.CodeGenerator.HoldingContainer;
import org.apache.drill.exec.expr.fn.FunctionHolder;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.vector.TypeHelper;
-import com.google.common.base.Preconditions;
+import com.sun.codemodel.JBlock;
+import com.sun.codemodel.JConditional;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JInvocation;
+import com.sun.codemodel.JVar;
public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, CodeGenerator<?>, RuntimeException> {
@@ -122,7 +122,7 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
@Override
public HoldingContainer visitUnknown(LogicalExpression e, CodeGenerator<?> generator) throws RuntimeException {
if(e instanceof ValueVectorReadExpression){
- return visitValueVectorExpression((ValueVectorReadExpression) e, generator);
+ return visitValueVectorReadExpression((ValueVectorReadExpression) e, generator);
}else if(e instanceof ValueVectorWriteExpression){
return visitValueVectorWriteExpression((ValueVectorWriteExpression) e, generator);
}else if(e instanceof ReturnValueExpression){
@@ -138,16 +138,7 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
HoldingContainer hc = child.accept(this, generator);
JBlock block = generator.getBlock();
- Class<?> vvClass = TypeHelper.getValueVectorClass(child.getMajorType().getMinorType(), child.getMajorType().getMode());
- JType vvType = generator.getModel()._ref(vvClass);
- JVar vv = generator.declareClassField("vv", vvType);
-
- // get value vector in setup block.
- JVar obj = generator.getSetupBlock().decl( //
- generator.getModel()._ref(Object.class), //
- generator.getNextVar("obj"), //
- JExpr.direct("outgoing").invoke("getValueVectorById").arg(JExpr.lit(e.getFieldId())).arg( ((JClass)vvType).dotclass()));
- generator.getSetupBlock().assign(vv, JExpr.cast(vvType, obj));
+ JVar vv = generator.declareVectorValueSetupAndMember("outgoing", e.getFieldId());
if(hc.isOptional()){
vv.invoke("getMutator").invoke("set").arg(JExpr.direct("outIndex"));
@@ -162,42 +153,42 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
return null;
}
- private HoldingContainer visitValueVectorExpression(ValueVectorReadExpression e, CodeGenerator<?> generator) throws RuntimeException{
+ private HoldingContainer visitValueVectorReadExpression(ValueVectorReadExpression e, CodeGenerator<?> generator) throws RuntimeException{
// declare value vector
- Class<?> vvClass = TypeHelper.getValueVectorClass(e.getMajorType().getMinorType(), e.getMajorType().getMode());
- JType vvType = generator.getModel()._ref(vvClass);
- JVar vv1 = generator.declareClassField("vv", vvType);
- // get value vector from incoming batch and
- // get value vector in setup block.
- JVar obj = generator.getSetupBlock().decl( //
- generator.getModel()._ref(Object.class), //
- generator.getNextVar("obj"), //
- JExpr.direct("incoming").invoke("getValueVectorById").arg(JExpr.lit(e.getFieldId())).arg( ((JClass)vvType).dotclass()));
- generator.getSetupBlock().assign(vv1, JExpr.cast(vvType, obj));
-
+ JVar vv1 = generator.declareVectorValueSetupAndMember("incoming", e.getFieldId());
+ final String indexVariableName = e.isNamed() ? e.getIndexVariableName() : "inIndex";
+ JExpression indexVariable = JExpr.direct(indexVariableName);
+
+ JInvocation getValueAccessor = vv1.invoke("getAccessor").invoke("get");
+ if(e.isSuperReader()){
+
+ getValueAccessor = ((JExpression) vv1.component(indexVariable.shrz(JExpr.lit(16)))).invoke("getAccessor").invoke("get");
+ indexVariable = indexVariable.band(JExpr.lit((int) Character.MAX_VALUE));
+ }
+
+
// evaluation work.
HoldingContainer out = generator.declare(e.getMajorType());
-
if(out.isOptional()){
JBlock blk = generator.getBlock();
- blk.assign(out.getIsSet(), vv1.invoke("getAccessor").invoke("isSet").arg(JExpr.direct("inIndex")));
+ blk.assign(out.getIsSet(), vv1.invoke("getAccessor").invoke("isSet").arg(indexVariable));
JConditional jc = blk._if(out.getIsSet().eq(JExpr.lit(1)));
if (e.getMajorType().getMinorType() == TypeProtos.MinorType.VARCHAR ||
e.getMajorType().getMinorType() == TypeProtos.MinorType.VARBINARY) {
jc._then()
- .add(vv1.invoke("getAccessor").invoke("get").arg(JExpr.direct("inIndex")).arg(out.getHolder()));
+ .add(getValueAccessor.arg(JExpr.direct("inIndex")).arg(out.getHolder()));
} else {
jc._then()
- .assign(out.getValue(), vv1.invoke("getAccessor").invoke("get").arg(JExpr.direct("inIndex")));
+ .assign(out.getValue(), getValueAccessor.arg(indexVariable));
}
}else{
if (e.getMajorType().getMinorType() == TypeProtos.MinorType.VARCHAR ||
e.getMajorType().getMinorType() == TypeProtos.MinorType.VARBINARY) {
- generator.getBlock().add(vv1.invoke("getAccessor").invoke("get").arg(JExpr.direct("inIndex")).arg(out.getHolder()));
+ generator.getBlock().add(getValueAccessor.arg(indexVariable).arg(out.getHolder()));
} else {
- generator.getBlock().assign(out.getValue(), vv1.invoke("getAccessor").invoke("get").arg(JExpr.direct("inIndex")));
+ generator.getBlock().assign(out.getValue(), getValueAccessor.arg(indexVariable));
}
}
return out;
@@ -215,6 +206,7 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
@Override
public HoldingContainer visitQuotedStringConstant(QuotedString e, CodeGenerator<?> CodeGenerator) throws RuntimeException {
throw new UnsupportedOperationException("We don't yet support string literals as we need to build the string value holders.");
+
// JExpr stringLiteral = JExpr.lit(e.value);
// CodeGenerator.block.decl(stringLiteral.invoke("getBytes").arg(JExpr.ref(Charsets.UTF_8));
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index 72e5c93..07db72c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -30,7 +30,7 @@ import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.expression.visitors.SimpleExprVisitor;
import org.apache.drill.exec.record.NullExpression;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.TypedFieldId;
import com.google.common.collect.Lists;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
index b1888d6..f86bd29 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
@@ -4,19 +4,42 @@ import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.visitors.ExprVisitor;
import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
+import org.apache.drill.exec.record.TypedFieldId;
public class ValueVectorReadExpression implements LogicalExpression{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueVectorReadExpression.class);
private final MajorType type;
- private final int fieldId;
+ private final TypedFieldId fieldId;
+ private final boolean superReader;
+ private final String indexVariableName;
- public ValueVectorReadExpression(TypedFieldId tfId) {
+ public ValueVectorReadExpression(TypedFieldId tfId){
+ this(tfId, null);
+ }
+
+ public ValueVectorReadExpression(TypedFieldId tfId, String indexVariableName) {
this.type = tfId.getType();
- this.fieldId = tfId.getFieldId();
+ this.fieldId = tfId;
+ this.superReader = tfId.isHyperReader();
+ this.indexVariableName = indexVariableName;
}
+ public boolean isNamed(){
+ return indexVariableName != null;
+ }
+
+ public String getIndexVariableName(){
+ return indexVariableName;
+ }
+
+ public TypedFieldId getTypedFieldId(){
+ return fieldId;
+ }
+
+ public boolean isSuperReader(){
+ return superReader;
+ }
@Override
public MajorType getMajorType() {
return type;
@@ -27,7 +50,7 @@ public class ValueVectorReadExpression implements LogicalExpression{
return visitor.visitUnknown(this, value);
}
- public int getFieldId() {
+ public TypedFieldId getFieldId() {
return fieldId;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorWriteExpression.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorWriteExpression.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorWriteExpression.java
index 417b975..58141d8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorWriteExpression.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorWriteExpression.java
@@ -5,19 +5,26 @@ import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.visitors.ExprVisitor;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.TypedFieldId;
public class ValueVectorWriteExpression implements LogicalExpression {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueVectorWriteExpression.class);
- private final int fieldId;
+ private final TypedFieldId fieldId;
private final LogicalExpression child;
+ private final String indexVariableName;
- public ValueVectorWriteExpression(int fieldId, LogicalExpression child){
+ public ValueVectorWriteExpression(TypedFieldId fieldId, LogicalExpression child){
+ this(fieldId, child, null);
+ }
+
+ public ValueVectorWriteExpression(TypedFieldId fieldId, LogicalExpression child, String indexVariableName){
this.fieldId = fieldId;
this.child = child;
+ this.indexVariableName = indexVariableName;
}
- public int getFieldId() {
+ public TypedFieldId getFieldId() {
return fieldId;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
index 0915fa3..d2bcaa2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
@@ -16,24 +16,48 @@ import org.apache.drill.exec.vector.BigIntHolder;
-@FunctionTemplate(name = "alternate", scope = FunctionScope.SIMPLE)
-public class Alternator implements DrillFunc{
-
- @Workspace int val;
- @Output BigIntHolder out;
+public class Alternator {
- public void setup(RecordBatch incoming) {
- val = 0;
- }
+ @FunctionTemplate(name = "alternate", scope = FunctionScope.SIMPLE)
+ public static class Alternate2 implements DrillFunc{
+ @Workspace int val;
+ @Output BigIntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ val = 0;
+ }
+
+ public void eval() {
+ out.value = val;
+ if(val == 0){
+ val = 1;
+ }else{
+ val = 0;
+ }
+ }
+ }
- public void eval() {
- out.value = val;
- if(val == 0){
- val = 1;
- }else{
+ @FunctionTemplate(name = "alternate3", scope = FunctionScope.SIMPLE)
+ public static class Alternate3 implements DrillFunc{
+ @Workspace int val;
+ @Output BigIntHolder out;
+
+ public void setup(RecordBatch incoming) {
val = 0;
}
+
+
+ public void eval() {
+ out.value = val;
+ if(val == 0){
+ val = 1;
+ }else if(val == 1){
+ val = 2;
+ }else{
+ val = 0;
+ }
+ }
}
public static class Provider implements CallProvider{
@@ -41,7 +65,8 @@ public class Alternator implements DrillFunc{
@Override
public FunctionDefinition[] getFunctionDefintions() {
return new FunctionDefinition[]{
- FunctionDefinition.simple("alternate", NoArgValidator.VALIDATOR, new OutputTypeDeterminer.FixedType(Types.required(MinorType.BIGINT)), "alternate")
+ FunctionDefinition.simple("alternate", NoArgValidator.VALIDATOR, new OutputTypeDeterminer.FixedType(Types.required(MinorType.BIGINT)), "alternate"),
+ FunctionDefinition.simple("alternate3", NoArgValidator.VALIDATOR, new OutputTypeDeterminer.FixedType(Types.required(MinorType.BIGINT)), "alternate3")
};
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ComparatorFunctions.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ComparatorFunctions.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ComparatorFunctions.java
new file mode 100644
index 0000000..471bdf0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ComparatorFunctions.java
@@ -0,0 +1,58 @@
+package org.apache.drill.exec.expr.fn.impl;
+
+import org.apache.drill.common.expression.ArgumentValidators;
+import org.apache.drill.common.expression.CallProvider;
+import org.apache.drill.common.expression.FunctionDefinition;
+import org.apache.drill.common.expression.OutputTypeDeterminer;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.DrillFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.vector.BigIntHolder;
+import org.apache.drill.exec.vector.IntHolder;
+
+public class ComparatorFunctions {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComparatorFunctions.class);
+
+ @FunctionTemplate(name = "compare_to", scope = FunctionTemplate.FunctionScope.SIMPLE)
+ public static class IntComparator implements DrillFunc {
+
+ @Param IntHolder left;
+ @Param IntHolder right;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch b) {}
+
+ public void eval() {
+ out.value = left.value < right.value ? -1 : ((left.value == right.value)? 0 : 1);
+ }
+ }
+
+ @FunctionTemplate(name = "compare_to", scope = FunctionTemplate.FunctionScope.SIMPLE)
+ public static class Long implements DrillFunc {
+
+ @Param BigIntHolder left;
+ @Param BigIntHolder right;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch b) {}
+
+ public void eval() {
+ out.value = left.value < right.value ? -1 : ((left.value == right.value)? 0 : 1);
+ }
+ }
+ public static final FunctionDefinition COMPARE_TO = FunctionDefinition.simple("compare_to", new ArgumentValidators.AllowedTypeList(2, Types.required(MinorType.INT)), new OutputTypeDeterminer.FixedType(Types.required(MinorType.INT)));
+ public static class Provider implements CallProvider{
+
+ @Override
+ public FunctionDefinition[] getFunctionDefintions() {
+ return new FunctionDefinition[]{
+ COMPARE_TO
+ };
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 64f3eb4..bbe4cfb 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -19,16 +19,13 @@ package org.apache.drill.exec.ops;
import java.io.IOException;
-import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.compile.ClassTransformer;
import org.apache.drill.exec.compile.QueryClassLoader;
-import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.metrics.SingleThreadNestedCounter;
-import org.apache.drill.exec.physical.impl.FilteringRecordBatchTransformer;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
@@ -106,7 +103,7 @@ public class FragmentContext {
public <T> T getImplementationClass(CodeGenerator<T> cg) throws ClassTransformationException, IOException{
long t1 = System.nanoTime();
T t= transformer.getImplementationClass(this.loader, cg.getDefinition(), cg.generate(), cg.getMaterializedClassName());
- logger.debug("Compile time: {} micros.", (System.nanoTime() - t1)/1000/1000 );
+ logger.debug("Compile time: {} millis.", (System.nanoTime() - t1)/1000/1000 );
return t;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
index 71513c7..77d3f69 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java
@@ -41,4 +41,5 @@ public interface Sender extends FragmentRoot {
*/
@JsonProperty("receiver-major-fragment")
public int getOppositeMajorFragmentId();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
deleted file mode 100644
index acabe30..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.impl;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
-
-public abstract class FilterRecordBatch implements RecordBatch {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
-
- private RecordBatch incoming;
- private SelectionVector2 selectionVector;
- private BatchSchema schema;
- private FilteringRecordBatchTransformer transformer;
- private int outstanding;
-
- public FilterRecordBatch(RecordBatch batch) {
- this.incoming = batch;
- }
-
- @Override
- public FragmentContext getContext() {
- return incoming.getContext();
- }
-
- @Override
- public BatchSchema getSchema() {
- return schema;
- }
-
- @Override
- public int getRecordCount() {
- return 0;
- }
-
- @Override
- public void kill() {
- incoming.kill();
- }
-
-
- @Override
- public SelectionVector2 getSelectionVector2() {
- return null;
- }
-
- @Override
- public SelectionVector4 getSelectionVector4() {
- return null;
- }
-
- @Override
- public TypedFieldId getValueVectorId(SchemaPath path) {
- return null;
- }
-
- @Override
- public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> vvClass) {
- return null;
- }
-
- public WritableBatch getWritableBatch() {
- return null;
- }
-
- abstract int applyFilter(SelectionVector2 vector, int count);
-
- /**
- * Release all assets.
- */
- private void close() {
-
- }
-
- @Override
- public IterOutcome next() {
- while (true) {
- IterOutcome o = incoming.next();
- switch (o) {
- case OK_NEW_SCHEMA:
- transformer = null;
- schema = transformer.getSchema();
- // fall through to ok.
- case OK:
-
- case NONE:
- case STOP:
- close();
- return IterOutcome.STOP;
- }
-
- if (outstanding > 0) {
- // move data to output location.
-
- for (int i = incoming.getRecordCount() - outstanding; i < incoming.getRecordCount(); i++) {
-
- }
- }
-
-// // make sure the bit vector is as large as the current record batch.
-// if (selectionVector.capacity() < incoming.getRecordCount()) {
-// selectionVector.allocateNew(incoming.getRecordCount());
-// }
-
- return null;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
deleted file mode 100644
index 0813481..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.impl;
-
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-
-public abstract class FilteringRecordBatchTransformer {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilteringRecordBatchTransformer.class);
-
- final RecordBatch incoming;
- final SelectionVector2 selectionVector;
- final BatchSchema schema;
-
- public FilteringRecordBatchTransformer(RecordBatch incoming, OutputMutator output, SelectionVector2 selectionVector) {
- super();
- this.incoming = incoming;
- this.selectionVector = selectionVector;
- this.schema = innerSetup();
- }
-
- public abstract BatchSchema innerSetup();
-
- /**
- * Applies the filter to the selection index. Ignores any values in the selection vector, instead creating a.
- * @return
- */
- public abstract int apply();
-
- /**
- * Applies the filter to the selection index. Utilizes the existing selection index and only evaluates on those records.
- * @return
- */
- public abstract int applyWithSelection();
-
- public BatchSchema getSchema() {
- return schema;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 7740955..c31e9e4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -34,8 +34,10 @@ import org.apache.drill.exec.physical.config.RandomReceiver;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
+import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
import org.apache.drill.exec.physical.impl.svremover.SVRemoverCreator;
import org.apache.drill.exec.record.RecordBatch;
@@ -52,6 +54,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
private ProjectBatchCreator pbc = new ProjectBatchCreator();
private FilterBatchCreator fbc = new FilterBatchCreator();
private SVRemoverCreator svc = new SVRemoverCreator();
+ private SortBatchCreator sbc = new SortBatchCreator();
private RootExec root = null;
private ImplCreator(){}
@@ -88,6 +91,12 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
}
}
+
+ @Override
+ public RecordBatch visitSort(Sort sort, FragmentContext context) throws ExecutionSetupException {
+ return sbc.getBatch(context, sort, getChildren(sort, context));
+ }
+
@Override
public RecordBatch visitScreen(Screen op, FragmentContext context) throws ExecutionSetupException {
Preconditions.checkArgument(root == null);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index 3f8aac7..9bcfe10 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -17,7 +17,6 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl;
-import org.apache.drill.exec.exception.FragmentSetupException;
/**
* A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index a8a62ca..5a543b0 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.physical.impl;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -29,13 +28,15 @@ import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SchemaBuilder;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.ValueVector;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
@@ -44,10 +45,9 @@ import com.google.common.collect.Maps;
public class ScanBatch implements RecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
- final List<ValueVector> vectors = Lists.newLinkedList();
final Map<MaterializedField, ValueVector> fieldVectorMap = Maps.newHashMap();
- private VectorHolder holder = new VectorHolder(vectors);
+ private VectorContainer holder = new VectorContainer();
private BatchSchema schema;
private int recordCount;
private boolean schemaChanged = true;
@@ -91,9 +91,7 @@ public class ScanBatch implements RecordBatch {
}
private void releaseAssets() {
- for (ValueVector v : vectors) {
- v.close();
- }
+ holder.clear();
}
@Override
@@ -139,10 +137,12 @@ public class ScanBatch implements RecordBatch {
}
@Override
- public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz) {
- return holder.getValueVector(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
+ return holder.getVectorAccessor(fieldId, clazz);
}
+
+
private class Mutator implements OutputMutator {
private SchemaBuilder builder = BatchSchema.newBuilder();
@@ -150,12 +150,12 @@ public class ScanBatch implements RecordBatch {
schemaChanged();
ValueVector vector = fieldVectorMap.remove(field);
if (vector == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
- vectors.remove(vector);
+ holder.remove(vector);
vector.close();
}
public void addField(ValueVector vector) {
- vectors.add(vector);
+ holder.add(vector);
fieldVectorMap.put(vector.getField(), vector);
builder.addField(vector.getField());
}
@@ -169,8 +169,8 @@ public class ScanBatch implements RecordBatch {
}
@Override
- public Iterator<ValueVector> iterator() {
- return vectors.iterator();
+ public Iterator<VectorWrapper<?>> iterator() {
+ return holder.iterator();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 516b5af..5c5e2e5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -17,8 +17,6 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl;
-import io.netty.buffer.ByteBuf;
-
import java.util.List;
import org.apache.drill.exec.ops.FragmentContext;
@@ -27,7 +25,6 @@ import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.proto.UserProtos.QueryResult;
import org.apache.drill.exec.record.RecordBatch;
@@ -45,6 +42,7 @@ public class ScreenCreator implements RootCreator<Screen>{
@Override
public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) {
+ Preconditions.checkNotNull(children);
Preconditions.checkArgument(children.size() == 1);
return new ScreenRoot(context, children.iterator().next());
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 231bc6c..bad3a03 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -76,8 +76,8 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
tunnel.sendRecordBatch(new RecordSendFailure(), context, b2);
return false;
- case OK:
case OK_NEW_SCHEMA:
+ case OK:
FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
tunnel.sendRecordBatch(new RecordSendFailure(), context, batch);
return true;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java
deleted file mode 100644
index 65a7365..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/VectorHolder.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package org.apache.drill.exec.physical.impl;
-
-import java.util.List;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.record.RecordBatch.TypedFieldId;
-import org.apache.drill.exec.vector.ValueVector;
-
-public class VectorHolder {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorHolder.class);
-
- private List<ValueVector> vectors;
-
- public VectorHolder(List<ValueVector> vectors) {
- super();
- this.vectors = vectors;
- }
-
- public TypedFieldId getValueVector(SchemaPath path) {
- for(int i =0; i < vectors.size(); i++){
- ValueVector vv = vectors.get(i);
- if(vv.getField().matches(path)) return new TypedFieldId(vv.getField().getType(), i);
- }
- return null;
- }
-
- @SuppressWarnings("unchecked")
- public <T extends ValueVector> T getValueVector(int fieldId, Class<?> clazz) {
- ValueVector v = vectors.get(fieldId);
- assert v != null;
- if (v.getClass() != clazz){
- logger.warn(String.format(
- "Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
- clazz.getCanonicalName(), v.getClass().getCanonicalName()));
- return null;
- }
- return (T) v;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index a575f69..93f643d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -24,11 +24,12 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.InvalidValueAccessor;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.record.RawFragmentBatchProvider;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -70,7 +71,7 @@ public class WireRecordBatch implements RecordBatch{
}
@Override
- public Iterator<ValueVector> iterator() {
+ public Iterator<VectorWrapper<?>> iterator() {
return batchLoader.iterator();
}
@@ -86,14 +87,14 @@ public class WireRecordBatch implements RecordBatch{
@Override
public TypedFieldId getValueVectorId(SchemaPath path) {
- return batchLoader.getValueVector(path);
+ return batchLoader.getValueVectorId(path);
}
-
+
@Override
- public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz){
- return batchLoader.getValueVector(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
+ return batchLoader.getValueAccessorById(fieldId, clazz);
}
-
+
@Override
public IterOutcome next() {
RawFragmentBatch batch = fragProvider.getNext();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index babc66e..806ead6 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -1,144 +1,68 @@
package org.apache.drill.exec.physical.impl.filter;
import java.io.IOException;
-import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Filter;
-import org.apache.drill.exec.physical.impl.VectorHolder;
-import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.ValueVector;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-public class FilterRecordBatch implements RecordBatch{
+public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
- private final Filter filterConfig;
- private final RecordBatch incoming;
- private final FragmentContext context;
private final SelectionVector2 sv;
- private BatchSchema outSchema;
private Filterer filter;
- private List<ValueVector> outputVectors;
- private VectorHolder vh;
public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context){
- this.filterConfig = pop;
- this.incoming = incoming;
- this.context = context;
+ super(pop, context, incoming);
sv = new SelectionVector2(context.getAllocator());
}
-
@Override
public FragmentContext getContext() {
return context;
}
@Override
- public BatchSchema getSchema() {
- Preconditions.checkNotNull(outSchema);
- return outSchema;
- }
-
- @Override
public int getRecordCount() {
return sv.getCount();
}
@Override
- public void kill() {
- incoming.kill();
- }
-
-
- @Override
- public Iterator<ValueVector> iterator() {
- return outputVectors.iterator();
- }
-
- @Override
public SelectionVector2 getSelectionVector2() {
return sv;
}
@Override
- public SelectionVector4 getSelectionVector4() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TypedFieldId getValueVectorId(SchemaPath path) {
- return vh.getValueVector(path);
- }
-
- @Override
- public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz) {
- return vh.getValueVector(fieldId, clazz);
- }
-
- @Override
- public IterOutcome next() {
-
- IterOutcome upstream = incoming.next();
- logger.debug("Upstream... {}", upstream);
- switch(upstream){
- case NONE:
- case NOT_YET:
- case STOP:
- return upstream;
- case OK_NEW_SCHEMA:
- try{
- filter = createNewFilterer();
- }catch(SchemaChangeException ex){
- incoming.kill();
- logger.error("Failure during query", ex);
- context.fail(ex);
- return IterOutcome.STOP;
- }
- // fall through.
- case OK:
- int recordCount = incoming.getRecordCount();
- sv.allocateNew(recordCount);
- filter.filterBatch(recordCount);
- for(ValueVector v : this.outputVectors){
- ValueVector.Mutator m = v.getMutator();
- m.setValueCount(recordCount);
- }
- return upstream; // change if upstream changed, otherwise normal.
- default:
- throw new UnsupportedOperationException();
+ protected void doWork() {
+ int recordCount = incoming.getRecordCount();
+ sv.allocateNew(recordCount);
+ filter.filterBatch(recordCount);
+ for(VectorWrapper<?> v : container){
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(recordCount);
}
}
-
- private Filterer createNewFilterer() throws SchemaChangeException{
- if(outputVectors != null){
- for(ValueVector v : outputVectors){
- v.close();
- }
- }
- this.outputVectors = Lists.newArrayList();
- this.vh = new VectorHolder(outputVectors);
- LogicalExpression filterExpression = filterConfig.getExpr();
+ @Override
+ protected void setupNewSchema() throws SchemaChangeException {
+ container.clear();
+ LogicalExpression filterExpression = popConfig.getExpr();
final ErrorCollector collector = new ErrorCollectorImpl();
final List<TransferPair> transfers = Lists.newArrayList();
final CodeGenerator<Filterer> cg = new CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION, context.getFunctionRegistry());
@@ -150,32 +74,21 @@ public class FilterRecordBatch implements RecordBatch{
cg.addExpr(new ReturnValueExpression(expr));
- for(ValueVector v : incoming){
- TransferPair pair = v.getTransferPair();
- outputVectors.add(pair.getTo());
+ for(VectorWrapper<?> v : incoming){
+ TransferPair pair = v.getValueVector().getTransferPair();
+ container.add(pair.getTo());
transfers.add(pair);
}
- SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(SelectionVectorMode.TWO_BYTE);
- for(ValueVector v : outputVectors){
- bldr.addField(v.getField());
- }
- this.outSchema = bldr.build();
+ container.buildSchema(SelectionVectorMode.TWO_BYTE);
try {
TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
- Filterer filterer = context.getImplementationClass(cg);
- filterer.setup(context, incoming, this, tx);
- return filterer;
+ this.filter = context.getImplementationClass(cg);
+ filter.setup(context, incoming, this, tx);
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
}
}
- @Override
- public WritableBatch getWritableBatch() {
- return WritableBatch.get(this);
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
index 17c65e9..b62e52b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
@@ -17,12 +17,6 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl.materialize;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserProtos.QueryResult;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.WritableBatch;
public interface RecordMaterializer {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 29660cb..8f06290 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -1,7 +1,6 @@
package org.apache.drill.exec.physical.impl.project;
import java.io.IOException;
-import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.expression.ErrorCollector;
@@ -20,19 +19,15 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.impl.VectorHolder;
import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
-import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.TypeHelper;
import org.apache.drill.exec.vector.ValueVector;
@@ -40,118 +35,39 @@ import org.apache.drill.exec.vector.ValueVector;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-public class ProjectRecordBatch implements RecordBatch{
+public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
- private final Project pop;
- private final RecordBatch incoming;
- private final FragmentContext context;
- private BatchSchema outSchema;
private Projector projector;
private List<ValueVector> allocationVectors;
- private List<ValueVector> outputVectors;
- private VectorHolder vh;
-
public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context){
- this.pop = pop;
- this.incoming = incoming;
- this.context = context;
+ super(pop, context, incoming);
}
@Override
- public Iterator<ValueVector> iterator() {
- return outputVectors.iterator();
- }
-
- @Override
- public FragmentContext getContext() {
- return context;
- }
-
- @Override
- public BatchSchema getSchema() {
- Preconditions.checkNotNull(outSchema);
- return outSchema;
- }
-
- @Override
public int getRecordCount() {
return incoming.getRecordCount();
}
@Override
- public void kill() {
- incoming.kill();
- }
-
- @Override
- public SelectionVector2 getSelectionVector2() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public SelectionVector4 getSelectionVector4() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TypedFieldId getValueVectorId(SchemaPath path) {
- return vh.getValueVector(path);
- }
-
- @Override
- public <T extends ValueVector> T getValueVectorById(int fieldId, Class<?> clazz) {
- return vh.getValueVector(fieldId, clazz);
- }
-
- @Override
- public IterOutcome next() {
-
- IterOutcome upstream = incoming.next();
- logger.debug("Upstream... {}", upstream);
- switch(upstream){
- case NONE:
- case NOT_YET:
- case STOP:
- return upstream;
- case OK_NEW_SCHEMA:
- try{
- projector = createNewProjector();
- }catch(SchemaChangeException ex){
- incoming.kill();
- logger.error("Failure during query", ex);
- context.fail(ex);
- return IterOutcome.STOP;
- }
- // fall through.
- case OK:
- int recordCount = incoming.getRecordCount();
- for(ValueVector v : this.allocationVectors){
- AllocationHelper.allocate(v, recordCount, 50);
- }
- projector.projectRecords(recordCount, 0);
- for(ValueVector v : this.outputVectors){
- ValueVector.Mutator m = v.getMutator();
- m.setValueCount(recordCount);
- }
- return upstream; // change if upstream changed, otherwise normal.
- default:
- throw new UnsupportedOperationException();
+ protected void doWork() {
+ int recordCount = incoming.getRecordCount();
+ for(ValueVector v : this.allocationVectors){
+ AllocationHelper.allocate(v, recordCount, 50);
+ }
+ projector.projectRecords(recordCount, 0);
+ for(VectorWrapper<?> v : container){
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(recordCount);
}
}
-
- private Projector createNewProjector() throws SchemaChangeException{
+ @Override
+ protected void setupNewSchema() throws SchemaChangeException{
this.allocationVectors = Lists.newArrayList();
- if(outputVectors != null){
- for(ValueVector v : outputVectors){
- v.close();
- }
- }
- this.outputVectors = Lists.newArrayList();
- this.vh = new VectorHolder(outputVectors);
- final List<NamedExpression> exprs = pop.getExprs();
+ container.clear();
+ final List<NamedExpression> exprs = popConfig.getExprs();
final ErrorCollector collector = new ErrorCollectorImpl();
final List<TransferPair> transfers = Lists.newArrayList();
@@ -168,43 +84,34 @@ public class ProjectRecordBatch implements RecordBatch{
// add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE){
ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
- ValueVector vvIn = incoming.getValueVectorById(vectorRead.getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode()));
+ ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
Preconditions.checkNotNull(incoming);
TransferPair tp = vvIn.getTransferPair();
transfers.add(tp);
- outputVectors.add(tp.getTo());
+ container.add(tp.getTo());
+ logger.debug("Added transfer.");
}else{
// need to do evaluation.
ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
allocationVectors.add(vector);
- outputVectors.add(vector);
- ValueVectorWriteExpression write = new ValueVectorWriteExpression(outputVectors.size() - 1, expr);
+ ValueVectorWriteExpression write = new ValueVectorWriteExpression(container.add(vector), expr);
cg.addExpr(write);
+ logger.debug("Added eval.");
}
}
- SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(incoming.getSchema().getSelectionVectorMode());
- for(ValueVector v : outputVectors){
- bldr.addField(v.getField());
- }
- this.outSchema = bldr.build();
+ container.buildSchema(incoming.getSchema().getSelectionVectorMode());
try {
- Projector projector = context.getImplementationClass(cg);
+ this.projector = context.getImplementationClass(cg);
projector.setup(context, incoming, this, transfers);
- return projector;
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
}
}
- @Override
- public WritableBatch getWritableBatch() {
- return WritableBatch.get(this);
- }
-
private MaterializedField getMaterializedField(FieldReference reference, LogicalExpression expr){
return new MaterializedField(getFieldDef(reference, expr.getMajorType()));
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java
new file mode 100644
index 0000000..86d0d61
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Comparator.java
@@ -0,0 +1,11 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface Comparator {
+
+ public abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+ public abstract int doEval(int inIndex, int outIndex);
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java
new file mode 100644
index 0000000..83d43b2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/ReadIndexRewriter.java
@@ -0,0 +1,87 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.IfExpression;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+
+import com.google.common.collect.Lists;
+
+public class ReadIndexRewriter implements ExprVisitor<LogicalExpression, String, RuntimeException> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReadIndexRewriter.class);
+
+ private String batchName;
+
+
+ @Override
+ public LogicalExpression visitUnknown(LogicalExpression e, String newIndexName) {
+ if (e instanceof ValueVectorReadExpression) {
+ ValueVectorReadExpression old = (ValueVectorReadExpression) e;
+ return new ValueVectorReadExpression(old.getTypedFieldId(), newIndexName);
+ } else {
+ throw new UnsupportedOperationException(String.format(
+ "ReadIndex rewriter doesn't know how to rewrite expression of type %s.", e.getClass().getName()));
+ }
+ }
+
+ @Override
+ public LogicalExpression visitFunctionCall(FunctionCall call, String newIndexName) {
+ List<LogicalExpression> args = Lists.newArrayList();
+ for (int i = 0; i < call.args.size(); ++i) {
+ LogicalExpression newExpr = call.args.get(i).accept(this, null);
+ args.add(newExpr);
+ }
+
+ return new FunctionCall(call.getDefinition(), args, call.getPosition());
+ }
+
+ @Override
+ public LogicalExpression visitIfExpression(IfExpression ifExpr, String newIndexName) {
+ List<IfExpression.IfCondition> conditions = Lists.newArrayList(ifExpr.iterator());
+ LogicalExpression newElseExpr = ifExpr.elseExpression.accept(this, null);
+
+ for (int i = 0; i < conditions.size(); ++i) {
+ IfExpression.IfCondition condition = conditions.get(i);
+
+ LogicalExpression newCondition = condition.condition.accept(this, null);
+ LogicalExpression newExpr = condition.expression.accept(this, null);
+ conditions.set(i, new IfExpression.IfCondition(newCondition, newExpr));
+ }
+
+ return IfExpression.newBuilder().setElse(newElseExpr).addConditions(conditions).build();
+ }
+
+ @Override
+ public LogicalExpression visitSchemaPath(SchemaPath path, String newIndexName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LogicalExpression visitLongConstant(LongExpression intExpr, String value) throws RuntimeException {
+ return intExpr;
+ }
+
+ @Override
+ public LogicalExpression visitDoubleConstant(DoubleExpression dExpr, String value) throws RuntimeException {
+ return dExpr;
+ }
+
+ @Override
+ public LogicalExpression visitBooleanConstant(BooleanExpression e, String value) throws RuntimeException {
+ return e;
+ }
+
+ @Override
+ public LogicalExpression visitQuotedStringConstant(QuotedString e, String value) throws RuntimeException {
+ return e;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
new file mode 100644
index 0000000..a21af09
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -0,0 +1,47 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import java.util.List;
+
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Holds the data for a particular record batch for later manipulation.
+ */
+class RecordBatchData {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchData.class);
+
+ final List<ValueVector> vectors = Lists.newArrayList();
+ final SelectionVector2 sv2;
+ final int recordCount;
+
+ public RecordBatchData(RecordBatch batch){
+ this.sv2 = batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE ? batch.getSelectionVector2().clone() : null;
+
+ for(VectorWrapper<?> v : batch){
+ if(v.isHyper()) throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch.");
+ TransferPair tp = v.getValueVector().getTransferPair();
+ tp.transfer();
+ vectors.add(tp.getTo());
+ }
+
+ recordCount = batch.getRecordCount();
+ }
+
+ public int getRecordCount(){
+ return recordCount;
+ }
+ public List<ValueVector> getVectors() {
+ return vectors;
+ }
+
+ public SelectionVector2 getSv2() {
+ return sv2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
new file mode 100644
index 0000000..e361e38
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -0,0 +1,171 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import java.io.IOException;
+
+import org.apache.drill.common.defs.OrderDef;
+import org.apache.drill.common.defs.OrderDef.Direction;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.CodeGenerator.HoldingContainer;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.fn.impl.ComparatorFunctions;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import com.google.common.collect.ImmutableList;
+import com.sun.codemodel.JConditional;
+import com.sun.codemodel.JExpr;
+
+public class SortBatch extends AbstractRecordBatch<Sort> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatch.class);
+
+ private static long MAX_SORT_BYTES = 8l * 1024 * 1024 * 1024;
+
+ private final RecordBatch incoming;
+ private SortRecordBatchBuilder builder;
+ private SelectionVector4 sv4;
+ private Sorter sorter;
+ private BatchSchema schema;
+
+ public SortBatch(Sort popConfig, FragmentContext context, RecordBatch incoming) {
+ super(popConfig, context);
+ this.incoming = incoming;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return sv4.getLength();
+ }
+
+ @Override
+ public void kill() {
+ incoming.kill();
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ return sv4;
+ }
+
+ @Override
+ public IterOutcome next() {
+ if(builder != null){
+ if(sv4.next()){
+ return IterOutcome.OK;
+ }else{
+ return IterOutcome.NONE;
+ }
+ }
+
+
+ try{
+ outer: while (true) {
+ IterOutcome upstream = incoming.next();
+ switch (upstream) {
+ case NONE:
+ break outer;
+ case NOT_YET:
+ case STOP:
+ container.clear();
+ return upstream;
+ case OK_NEW_SCHEMA:
+ // only change in the case that the schema truly changes. Artificial schema changes are ignored.
+ if(!incoming.getSchema().equals(schema)){
+ if (builder != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES, container);
+ this.schema = incoming.getSchema();
+ }
+ // fall through.
+ case OK:
+ if(!builder.add(incoming)){
+ throw new UnsupportedOperationException("Sort doesn't currently support doing an external sort.");
+ };
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ builder.build(context);
+ sv4 = builder.getSv4();
+
+ sorter = createNewSorter();
+ sorter.setup(context, this);
+ long t1 = System.nanoTime();
+ sorter.sort(sv4, container);
+ logger.debug("Sorted {} records in {} micros.", sv4.getTotalCount(), (System.nanoTime() - t1)/1000);
+
+ return IterOutcome.OK_NEW_SCHEMA;
+
+ }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+ kill();
+ logger.error("Failure during query", ex);
+ context.fail(ex);
+ return IterOutcome.STOP;
+ }
+ }
+
+
+ private Sorter createNewSorter() throws ClassTransformationException, IOException, SchemaChangeException{
+ CodeGenerator<Sorter> g = new CodeGenerator<Sorter>(Sorter.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+
+ for(OrderDef od : popConfig.getOrderings()){
+ // first, we rewrite the evaluation stack for each side of the comparison.
+ ErrorCollector collector = new ErrorCollectorImpl();
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), this, collector);
+ if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ ReadIndexRewriter rewriter = new ReadIndexRewriter();
+ LogicalExpression left = expr.accept(rewriter, "inIndex");
+ LogicalExpression right = expr.accept(rewriter, "outIndex");
+
+ // next we wrap the two comparison sides and add the expression block for the comparison.
+ FunctionCall f = new FunctionCall(ComparatorFunctions.COMPARE_TO, ImmutableList.of(left, right), ExpressionPosition.UNKNOWN);
+ HoldingContainer out = g.addExpr(f);
+ JConditional jc = g.getBlock()._if(out.getValue().ne(JExpr.lit(0)));
+
+ //TODO: is this the right order...
+ if(od.getDirection() == Direction.ASC){
+ jc._then()._return(out.getValue());
+ }else{
+ jc._then()._return(out.getValue().minus());
+ }
+ }
+
+ g.getBlock()._return(JExpr.lit(0));
+
+ return context.getImplementationClass(g);
+
+
+ }
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ throw new UnsupportedOperationException("A sort batch is not writable.");
+ }
+
+ @Override
+ protected void killIncoming() {
+ incoming.kill();
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/db3afaa8/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
new file mode 100644
index 0000000..0a671a8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
@@ -0,0 +1,23 @@
+package org.apache.drill.exec.physical.impl.sort;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+
+public class SortBatchCreator implements BatchCreator<Sort>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatchCreator.class);
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, Sort config, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.size() == 1);
+ return new SortBatch(config, context, children.iterator().next());
+ }
+
+
+}