You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:45:01 UTC

[23/27] git commit: implement hash expression evaluation

implement hash expression evaluation


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2c8094b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2c8094b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2c8094b4

Branch: refs/heads/master
Commit: 2c8094b420da075dec1cb224ff4ecb37f3d7f7f4
Parents: 98156ee
Author: Ben Becker <be...@gmail.com>
Authored: Mon Aug 12 02:27:24 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 18:31:31 2013 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/expr/fn/impl/Hash.java    |  5 +-
 .../partitionsender/OutgoingRecordBatch.java    |  9 ++--
 .../PartitionSenderRootExec.java                | 56 +++++++++++++++-----
 .../partitionsender/PartitionerTemplate.java    |  4 +-
 .../test/resources/sender/hash_exchange.json    |  2 +-
 5 files changed, 50 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c8094b4/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash.java
index 45a3ee4..2ffd389 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Hash.java
@@ -18,6 +18,7 @@
 
 package org.apache.drill.exec.expr.fn.impl;
 
+import com.google.common.hash.Hashing;
 import org.apache.drill.common.expression.*;
 import org.apache.drill.exec.expr.DrillFunc;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate;
@@ -38,8 +39,8 @@ public class Hash implements DrillFunc {
   }
 
   public void eval() {
-    // TODO: implement hash function for various types
-    out.value = (int)in.value;
+    // TODO: implement hash function for other types
+    out.value = Hashing.murmur3_128().hashLong(in.value).asInt();
   }
 
   public static class Provider implements CallProvider{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c8094b4/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 927cc75..bc1ef4e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -70,8 +70,9 @@ public class OutgoingRecordBatch implements RecordBatch {
     try {
       if (recordCount == recordCapacity) flush();
     } catch (SchemaChangeException e) {
-      // TODO:
-      logger.error("Unable to flush outgoing record batch: " + e);
+      incoming.kill();
+      logger.error("Error flushing outgoing batches", e);
+      context.fail(e);
     }
   }
 
@@ -86,7 +87,6 @@ public class OutgoingRecordBatch implements RecordBatch {
    * @throws SchemaChangeException
    */
   public boolean flush() throws SchemaChangeException {
-    logger.error("Creating FragmentWritableBatch.  IsLast? " + (isLast ? " (last batch)" : ""));
     final ExecProtos.FragmentHandle handle = context.getHandle();
 
     if (recordCount != 0) {
@@ -100,9 +100,7 @@ public class OutgoingRecordBatch implements RecordBatch {
       tunnel.sendRecordBatch(statusHandler, context, writableBatch);
     } else {
       logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : ""));
-
       if (isLast) {
-
         // if the last batch is empty, it must not contain any value vectors.
         vectorContainer = new VectorContainer();
 
@@ -116,7 +114,6 @@ public class OutgoingRecordBatch implements RecordBatch {
                                                                         getWritableBatch());
         tunnel.sendRecordBatch(statusHandler, context, writableBatch);
         return true;
-
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c8094b4/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 476de7d..6d24e0b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -18,6 +18,7 @@
 
 package org.apache.drill.exec.physical.impl.partitionsender;
 
+import com.google.common.collect.Lists;
 import com.sun.codemodel.*;
 import org.apache.drill.common.expression.*;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -27,14 +28,15 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.physical.impl.filter.Filterer;
+import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
 import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.vector.TypeHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
 import java.io.IOException;
+import java.util.List;
 
 class PartitionSenderRootExec implements RootExec {
 
@@ -122,6 +124,29 @@ class PartitionSenderRootExec implements RootExec {
     ok = false;
     incoming.kill();
   }
+  
+  private void generatePartitionFunction() throws SchemaChangeException {
+
+    LogicalExpression filterExpression = operator.getExpr();
+    final ErrorCollector collector = new ErrorCollectorImpl();
+    final CodeGenerator<Partitioner> cg = new CodeGenerator<Partitioner>(Partitioner.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+
+    final LogicalExpression expr = ExpressionTreeMaterializer.materialize(filterExpression, incoming, collector);
+    if(collector.hasErrors()){
+      throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
+    }
+
+    cg.addExpr(new ReturnValueExpression(expr));
+    
+    try {
+      Partitioner p = context.getImplementationClass(cg);
+      p.setup(context, incoming, outgoing);
+    } catch (ClassTransformationException | IOException e) {
+      throw new SchemaChangeException("Failure while attempting to load generated class", e);
+    }
+
+
+  }
 
   private void createPartitioner() throws SchemaChangeException {
 
@@ -131,7 +156,7 @@ class PartitionSenderRootExec implements RootExec {
     final CodeGenerator<Partitioner> cg = new CodeGenerator<Partitioner>(Partitioner.TEMPLATE_DEFINITION,
                                                                          context.getFunctionRegistry());
 
-    final LogicalExpression logicalExp = ExpressionTreeMaterializer.materialize(expr, incoming, collector);
+    final LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, incoming, collector);
     if (collector.hasErrors()) {
       throw new SchemaChangeException(String.format(
           "Failure while trying to materialize incoming schema.  Errors:\n %s.",
@@ -140,15 +165,18 @@ class PartitionSenderRootExec implements RootExec {
 
     // generate code to copy from an incoming value vector to the destination partition's outgoing value vector
     JExpression inIndex = JExpr.direct("inIndex");
-    JExpression outIndex = JExpr.direct("outIndex");
+    JExpression bucket = JExpr.direct("bucket");
     JType outgoingVectorArrayType = cg.getModel().ref(ValueVector.class).array().array();
     JType outgoingBatchArrayType = cg.getModel().ref(OutgoingRecordBatch.class).array();
-    cg.rotateBlock();
+
+    // generate evaluate expression to determine the hash
+    CodeGenerator.HoldingContainer exprHolder = cg.addExpr(materializedExpr);
+    cg.getBlock().decl(JType.parse(cg.getModel(), "int"), "bucket", exprHolder.getValue().mod(JExpr.lit(outgoing.length)));
 
     // declare and assign the array of outgoing record batches
     JVar outgoingBatches = cg.clazz.field(JMod.NONE,
-                                          outgoingBatchArrayType,
-                                          "outgoingBatches");
+        outgoingBatchArrayType,
+        "outgoingBatches");
     cg.getSetupBlock().assign(outgoingBatches, JExpr.direct("outgoing"));
 
     // declare a two-dimensional array of value vectors; batch is first dimension, ValueVector is the second
@@ -199,25 +227,25 @@ class PartitionSenderRootExec implements RootExec {
                                                        vvIn.getField().getType().getMode());
       JClass vvClass = cg.getModel().ref(vvType);
       // the following block generates calls to copyFrom(); e.g.:
-      // ((IntVector) outgoingVectors[outIndex][0]).copyFrom(inIndex,
-      //                                                     outgoingBatches[outIndex].getRecordCount(),
+      // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
+      //                                                     outgoingBatches[bucket].getRecordCount(),
       //                                                     vv1);
       cg.getBlock().add(
         ((JExpression) JExpr.cast(vvClass,
               ((JExpression)
                      outgoingVectors
-                       .component(outIndex))
+                       .component(bucket))
                        .component(JExpr.lit(fieldId))))
                        .invoke("copyFrom")
                        .arg(inIndex)
-                       .arg(((JExpression) outgoingBatches.component(outIndex)).invoke("getRecordCount"))
+                       .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount"))
                        .arg(incomingVV));
 
       ++fieldId;
     }
     // generate the OutgoingRecordBatch helper invocations
-    cg.getBlock().add(((JExpression) outgoingBatches.component(outIndex)).invoke("incRecordCount"));
-    cg.getBlock().add(((JExpression) outgoingBatches.component(outIndex)).invoke("flushIfNecessary"));
+    cg.getBlock().add(((JExpression) outgoingBatches.component(bucket)).invoke("incRecordCount"));
+    cg.getBlock().add(((JExpression) outgoingBatches.component(bucket)).invoke("flushIfNecessary"));
     try {
       // compile and setup generated code
       partitioner = context.getImplementationClassMultipleOutput(cg);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c8094b4/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 4072b20..7198c3a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -43,12 +43,10 @@ public abstract class PartitionerTemplate implements Partitioner {
     for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
       // for each record
 
-      // TODO: evaluate partitioning expression
-      int partition = 0;
       // TODO: if attempting to insert too large of a value into a vector:
       //         - send the batch
       //         - reallocate (at least the size of the current value) and try again
-      doEval(recordId, partition);
+      doEval(recordId, 0);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2c8094b4/sandbox/prototype/exec/java-exec/src/test/resources/sender/hash_exchange.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/sender/hash_exchange.json b/sandbox/prototype/exec/java-exec/src/test/resources/sender/hash_exchange.json
index 78f3394..38116e7 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/sender/hash_exchange.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/sender/hash_exchange.json
@@ -28,7 +28,7 @@
             @id: 2,
             child: 1,
             pop: "hash-to-random-exchange",
-            expr: "hash(red)"
+            expr: "hash(1)"
         },
         {
              @id: 3,