You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:45:01 UTC
[23/27] git commit: implement hash expression evaluation
implement hash expression evaluation
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2c8094b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2c8094b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2c8094b4
Branch: refs/heads/master
Commit: 2c8094b420da075dec1cb224ff4ecb37f3d7f7f4
Parents: 98156ee
Author: Ben Becker <be...@gmail.com>
Authored: Mon Aug 12 02:27:24 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 18:31:31 2013 -0700
----------------------------------------------------------------------
.../apache/drill/exec/expr/fn/impl/Hash.java | 5 +-
.../partitionsender/OutgoingRecordBatch.java | 9 ++--
.../PartitionSenderRootExec.java | 56 +++++++++++++++-----
.../partitionsender/PartitionerTemplate.java | 4 +-
.../test/resources/sender/hash_exchange.json | 2 +-
5 files changed, 50 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c8094b4/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash.java
index 45a3ee4..2ffd389 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.expr.fn.impl;
+import com.google.common.hash.Hashing;
import org.apache.drill.common.expression.*;
import org.apache.drill.exec.expr.DrillFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
@@ -38,8 +39,8 @@ public class Hash implements DrillFunc {
}
public void eval() {
- // TODO: implement hash function for various types
- out.value = (int)in.value;
+ // TODO: implement hash function for other types
+ out.value = Hashing.murmur3_128().hashLong(in.value).asInt();
}
public static class Provider implements CallProvider{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c8094b4/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 927cc75..bc1ef4e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -70,8 +70,9 @@ public class OutgoingRecordBatch implements RecordBatch {
try {
if (recordCount == recordCapacity) flush();
} catch (SchemaChangeException e) {
- // TODO:
- logger.error("Unable to flush outgoing record batch: " + e);
+ incoming.kill();
+ logger.error("Error flushing outgoing batches", e);
+ context.fail(e);
}
}
@@ -86,7 +87,6 @@ public class OutgoingRecordBatch implements RecordBatch {
* @throws SchemaChangeException
*/
public boolean flush() throws SchemaChangeException {
- logger.error("Creating FragmentWritableBatch. IsLast? " + (isLast ? " (last batch)" : ""));
final ExecProtos.FragmentHandle handle = context.getHandle();
if (recordCount != 0) {
@@ -100,9 +100,7 @@ public class OutgoingRecordBatch implements RecordBatch {
tunnel.sendRecordBatch(statusHandler, context, writableBatch);
} else {
logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : ""));
-
if (isLast) {
-
// if the last batch is empty, it must not contain any value vectors.
vectorContainer = new VectorContainer();
@@ -116,7 +114,6 @@ public class OutgoingRecordBatch implements RecordBatch {
getWritableBatch());
tunnel.sendRecordBatch(statusHandler, context, writableBatch);
return true;
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c8094b4/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 476de7d..6d24e0b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.partitionsender;
+import com.google.common.collect.Lists;
import com.sun.codemodel.*;
import org.apache.drill.common.expression.*;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -27,14 +28,15 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.physical.impl.filter.Filterer;
+import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.*;
import org.apache.drill.exec.vector.TypeHelper;
import org.apache.drill.exec.vector.ValueVector;
import java.io.IOException;
+import java.util.List;
class PartitionSenderRootExec implements RootExec {
@@ -122,6 +124,29 @@ class PartitionSenderRootExec implements RootExec {
ok = false;
incoming.kill();
}
+
+ private void generatePartitionFunction() throws SchemaChangeException {
+
+ LogicalExpression filterExpression = operator.getExpr();
+ final ErrorCollector collector = new ErrorCollectorImpl();
+ final CodeGenerator<Partitioner> cg = new CodeGenerator<Partitioner>(Partitioner.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(filterExpression, incoming, collector);
+ if(collector.hasErrors()){
+ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
+ }
+
+ cg.addExpr(new ReturnValueExpression(expr));
+
+ try {
+ Partitioner p = context.getImplementationClass(cg);
+ p.setup(context, incoming, outgoing);
+ } catch (ClassTransformationException | IOException e) {
+ throw new SchemaChangeException("Failure while attempting to load generated class", e);
+ }
+
+
+ }
private void createPartitioner() throws SchemaChangeException {
@@ -131,7 +156,7 @@ class PartitionSenderRootExec implements RootExec {
final CodeGenerator<Partitioner> cg = new CodeGenerator<Partitioner>(Partitioner.TEMPLATE_DEFINITION,
context.getFunctionRegistry());
- final LogicalExpression logicalExp = ExpressionTreeMaterializer.materialize(expr, incoming, collector);
+ final LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, incoming, collector);
if (collector.hasErrors()) {
throw new SchemaChangeException(String.format(
"Failure while trying to materialize incoming schema. Errors:\n %s.",
@@ -140,15 +165,18 @@ class PartitionSenderRootExec implements RootExec {
// generate code to copy from an incoming value vector to the destination partition's outgoing value vector
JExpression inIndex = JExpr.direct("inIndex");
- JExpression outIndex = JExpr.direct("outIndex");
+ JExpression bucket = JExpr.direct("bucket");
JType outgoingVectorArrayType = cg.getModel().ref(ValueVector.class).array().array();
JType outgoingBatchArrayType = cg.getModel().ref(OutgoingRecordBatch.class).array();
- cg.rotateBlock();
+
+ // generate evaluate expression to determine the hash
+ CodeGenerator.HoldingContainer exprHolder = cg.addExpr(materializedExpr);
+ cg.getBlock().decl(JType.parse(cg.getModel(), "int"), "bucket", exprHolder.getValue().mod(JExpr.lit(outgoing.length)));
// declare and assign the array of outgoing record batches
JVar outgoingBatches = cg.clazz.field(JMod.NONE,
- outgoingBatchArrayType,
- "outgoingBatches");
+ outgoingBatchArrayType,
+ "outgoingBatches");
cg.getSetupBlock().assign(outgoingBatches, JExpr.direct("outgoing"));
// declare a two-dimensional array of value vectors; batch is first dimension, ValueVector is the second
@@ -199,25 +227,25 @@ class PartitionSenderRootExec implements RootExec {
vvIn.getField().getType().getMode());
JClass vvClass = cg.getModel().ref(vvType);
// the following block generates calls to copyFrom(); e.g.:
- // ((IntVector) outgoingVectors[outIndex][0]).copyFrom(inIndex,
- // outgoingBatches[outIndex].getRecordCount(),
+ // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
+ // outgoingBatches[bucket].getRecordCount(),
// vv1);
cg.getBlock().add(
((JExpression) JExpr.cast(vvClass,
((JExpression)
outgoingVectors
- .component(outIndex))
+ .component(bucket))
.component(JExpr.lit(fieldId))))
.invoke("copyFrom")
.arg(inIndex)
- .arg(((JExpression) outgoingBatches.component(outIndex)).invoke("getRecordCount"))
+ .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount"))
.arg(incomingVV));
++fieldId;
}
// generate the OutgoingRecordBatch helper invocations
- cg.getBlock().add(((JExpression) outgoingBatches.component(outIndex)).invoke("incRecordCount"));
- cg.getBlock().add(((JExpression) outgoingBatches.component(outIndex)).invoke("flushIfNecessary"));
+ cg.getBlock().add(((JExpression) outgoingBatches.component(bucket)).invoke("incRecordCount"));
+ cg.getBlock().add(((JExpression) outgoingBatches.component(bucket)).invoke("flushIfNecessary"));
try {
// compile and setup generated code
partitioner = context.getImplementationClassMultipleOutput(cg);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c8094b4/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 4072b20..7198c3a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -43,12 +43,10 @@ public abstract class PartitionerTemplate implements Partitioner {
for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
// for each record
- // TODO: evaluate partitioning expression
- int partition = 0;
// TODO: if attempting to insert too large of a value into a vector:
// - send the batch
// - reallocate (at least the size of the current value) and try again
- doEval(recordId, partition);
+ doEval(recordId, 0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c8094b4/sandbox/prototype/exec/java-exec/src/test/resources/sender/hash_exchange.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/sender/hash_exchange.json b/sandbox/prototype/exec/java-exec/src/test/resources/sender/hash_exchange.json
index 78f3394..38116e7 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/sender/hash_exchange.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/sender/hash_exchange.json
@@ -28,7 +28,7 @@
@id: 2,
child: 1,
pop: "hash-to-random-exchange",
- expr: "hash(red)"
+ expr: "hash(1)"
},
{
@id: 3,