You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/11/12 05:11:25 UTC

[08/16] incubator-drill git commit: DRILL-1643, DRILL-1665: Flatten fixes - Fix repeated map vector to correctly report value count - Update flatten so init variables are reset for each new batch.

DRILL-1643, DRILL-1665: Flatten fixes
 - Fix repeated map vector to correctly report value count
 - Update flatten so init variables are reset for each new batch.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ed962497
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ed962497
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ed962497

Branch: refs/heads/master
Commit: ed962497f04d432591b33b7532741b07ab46fbfe
Parents: 761156b
Author: Jason Altekruse <al...@gmail.com>
Authored: Wed Nov 5 18:11:22 2014 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Nov 11 16:48:45 2014 -0800

----------------------------------------------------------------------
 .../impl/flatten/FlattenRecordBatch.java        |   2 +
 .../physical/impl/flatten/FlattenTemplate.java  |  40 +++--
 .../exec/physical/impl/flatten/Flattener.java   |   1 +
 .../exec/vector/complex/RepeatedMapVector.java  |   9 +-
 .../exec/physical/impl/flatten/TestFlatten.java |  99 +++++++++++++
 .../store/json/test_flatten_mappify2.json       | 148 +++++++++++++++++++
 6 files changed, 286 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 129174e..66c6168 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -161,6 +161,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
       this.recordCount = remainderIndex;
     } else {
       setValueCount(outputRecords);
+      flattener.resetGroupIndex();
       for(VectorWrapper<?> v: incoming) {
         v.clear();
       }
@@ -194,6 +195,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
       for (VectorWrapper<?> v : incoming) {
         v.clear();
       }
+      flattener.resetGroupIndex();
       this.recordCount = remainingRecordCount;
     }
     // In case of complex writer expression, vectors would be added to batch run-time.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
index af4cead..c5d3d93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -30,6 +30,8 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
 import com.google.common.collect.ImmutableList;
+
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector.RepeatedAccessor;
 import org.apache.drill.exec.vector.RepeatedVector;
 
 public abstract class FlattenTemplate implements Flattener {
@@ -40,7 +42,9 @@ public abstract class FlattenTemplate implements Flattener {
   private SelectionVector4 vector4;
   private SelectionVectorMode svMode;
   RepeatedVector fieldToFlatten;
+  RepeatedAccessor accessor;
   private int groupIndex;
+
   // this allows for groups to be written between batches if we run out of space, for cases where we have finished
   // a batch on the boundary it will be set to 0
   private int childIndexWithinCurrGroup;
@@ -56,6 +60,7 @@ public abstract class FlattenTemplate implements Flattener {
   @Override
   public void setFlattenField(RepeatedVector flattenField) {
     this.fieldToFlatten = flattenField;
+    this.accessor = flattenField.getAccessor();
   }
 
   public RepeatedVector getFlattenField() {
@@ -76,18 +81,26 @@ public abstract class FlattenTemplate implements Flattener {
         if (childIndexWithinCurrGroup == -1) {
           childIndexWithinCurrGroup = 0;
         }
-        outer:
-        for ( ; groupIndex < fieldToFlatten.getAccessor().getGroupCount(); groupIndex++) {
-          currGroupSize = fieldToFlatten.getAccessor().getGroupSizeAtIndex(groupIndex);
-          for ( ; childIndexWithinCurrGroup < currGroupSize; childIndexWithinCurrGroup++) {
-            if (!doEval(groupIndex, firstOutputIndex)) {
-              break outer;
+        outer: {
+          final int groupCount = accessor.getGroupCount();
+          for ( ; groupIndex < groupCount; groupIndex++) {
+            currGroupSize = accessor.getGroupSizeAtIndex(groupIndex);
+            for ( ; childIndexWithinCurrGroup < currGroupSize; childIndexWithinCurrGroup++) {
+              if (!doEval(groupIndex, firstOutputIndex)) {
+                break outer;
+              }
+              firstOutputIndex++;
+              childIndex++;
             }
-            firstOutputIndex++;
-            childIndex++;
+            childIndexWithinCurrGroup = 0;
           }
-          childIndexWithinCurrGroup = 0;
         }
+//        System.out.println(String.format("startIndex %d, recordCount %d, firstOutputIndex: %d, currGroupSize: %d, childIndexWithinCurrGroup: %d, groupIndex: %d", startIndex, recordCount, firstOutputIndex, currGroupSize, childIndexWithinCurrGroup, groupIndex));
+//        try{
+////          Thread.sleep(1000);
+//        }catch(Exception e){
+//
+//        }
 
         for (TransferPair t : transfers) {
           t.splitAndTransfer(startIndex, childIndex - startIndex);
@@ -113,6 +126,15 @@ public abstract class FlattenTemplate implements Flattener {
     doSetup(context, incoming, outgoing);
   }
 
+
+
+  @Override
+  public void resetGroupIndex() {
+    this.groupIndex = 0;
+    this.currGroupSize = 0;
+    this.childIndex = 0;
+  }
+
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
   public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
index 49b9c1b..2141ca2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
@@ -32,6 +32,7 @@ public interface Flattener {
   public abstract int flattenRecords(int startIndex, int recordCount, int firstOutputIndex);
   public void setFlattenField(RepeatedVector repeatedColumn);
   public RepeatedVector getFlattenField();
+  public void resetGroupIndex();
 
   public static TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 99b9453..9b7011c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -420,7 +420,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
   public void load(SerializedField metadata, DrillBuf buf) {
     List<SerializedField> fields = metadata.getChildList();
 
-    int bufOffset = offsets.load(metadata.getValueCount()+1, buf);
+    int bufOffset = offsets.load(metadata.getGroupCount()+1, buf);
 
     for (SerializedField fmd : fields) {
       MaterializedField fieldDef = MaterializedField.create(fmd);
@@ -444,7 +444,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
     SerializedField.Builder b = getField() //
         .getAsBuilder() //
         .setBufferLength(getBufferSize()) //
-        .setValueCount(accessor.getValueCount());
+        .setGroupCount(accessor.getGroupCount());
     for (ValueVector v : vectors.values()) {
       b.addChild(v.getMetadata());
     }
@@ -489,7 +489,8 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
 
     @Override
     public int getValueCount() {
-      return offsets.getAccessor().getValueCount() - 1;
+      return offsets.getAccessor().get(offsets.getAccessor().getValueCount() - 1);
+//      return offsets.getAccessor().getValueCount() - 1;
     }
 
     public int getGroupSizeAtIndex(int index) {
@@ -542,7 +543,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
 
     @Override
     public int getGroupCount() {
-      return size();
+      return offsets.getAccessor().getValueCount() - 1;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
index 9514517..d4c19a3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
@@ -22,21 +22,120 @@ import org.junit.Test;
 
 public class TestFlatten extends BaseTestQuery {
 
+  /**
+   *  enable this if you have the following files:
+   *    - /tmp/yelp_academic_dataset_business.json
+   *    - /tmp/mapkv.json
+   *    - /tmp/drill1665.json
+   */
+  public static boolean RUN_ADVANCED_TESTS = false;
+
+
+  @Test
+  public void testFlattenFailure() throws Exception {
+    test("select flatten(complex), rownum from cp.`/store/json/test_flatten_mappify2.json`");
+//    test("select complex, rownum from cp.`/store/json/test_flatten_mappify2.json`");
+  }
+
   @Test
   public void testKVGenFlatten1() throws Exception {
+    // works - TODO and verify results
     test("select flatten(kvgen(f1)) as monkey, x " +
         "from cp.`/store/json/test_flatten_mapify.json`");
   }
 
   @Test
   public void testTwoFlattens() throws Exception {
+    // second re-write rule has been added to test the fixes together, this now runs
     test("select `integer`, `float`, x, flatten(z), flatten(l) from cp.`/jsoninput/input2_modified.json`");
   }
 
   @Test
+  public void testFlattenRepeatedMap() throws Exception {
+    test("select `integer`, `float`, x, flatten(z) from cp.`/jsoninput/input2.json`");
+  }
+
+  @Test
+  public void testFlattenKVGenFlatten() throws Exception {
+    // currently does not fail, but produces incorrect results, requires second re-write rule to split up expressions
+    // with complex outputs
+    test("select `integer`, `float`, x, flatten(kvgen(flatten(z))) from cp.`/jsoninput/input2.json`");
+  }
+
+  @Test
+  public void testKVGenFlatten2() throws Exception {
+    // currently runs
+    // TODO - re-verify results by hand
+    if(RUN_ADVANCED_TESTS){
+      test("select flatten(kvgen(visited_cellid_counts)) as mytb from dfs.`/tmp/mapkv.json`") ;
+    }
+  }
+
+  @Test
   public void testFilterFlattenedRecords() throws Exception {
+    // WORKS!!
+    // TODO - hand verify results
     test("select t2.key from (select t.monkey.`value` as val, t.monkey.key as key from (select flatten(kvgen(f1)) as monkey, x " +
         "from cp.`/store/json/test_flatten_mapify.json`) as t) as t2 where t2.val > 1");
   }
 
+  @Test
+  public void testFilterFlattenedRecords2() throws Exception {
+    // previously failed in generated code
+    //  "value" is neither a method, a field, nor a member class of "org.apache.drill.exec.expr.holders.RepeatedVarCharHolder" [ 42eb1fa1-0742-4e4f-8723-609215c18900 on 10.250.0.86:31010 ]
+    // appears to be resolving the data coming out of flatten as repeated, check fast schema stuff
+
+    // FIXED BY RETURNING PROPER SCHEMA DURING FAST SCHEMA STEP
+    // these types of problems are being solved more generally as we develp better support for chaning schema
+    if(RUN_ADVANCED_TESTS){
+      test("select celltbl.catl from (\n" +
+          "        select flatten(categories) catl from dfs.`/tmp/yelp_academic_dataset_business.json` b limit 100\n" +
+          "    )  celltbl where celltbl.catl = 'Doctors'");
+    }
+  }
+
+  @Test
+  public void countAggFlattened() throws Exception {
+    if(RUN_ADVANCED_TESTS){
+      test("select celltbl.catl, count(celltbl.catl) from ( " +
+          "select business_id, flatten(categories) catl from dfs.`/tmp/yelp_academic_dataset_business.json` b limit 100 " +
+          ")  celltbl group by celltbl.catl limit 10 ");
+    }
+  }
+
+
+  @Test
+  public void flattenAndAdditionalColumn() throws Exception {
+    if(RUN_ADVANCED_TESTS){
+      test("select business_id, flatten(categories) from dfs.`/tmp/yelp_academic_dataset_business.json` b");
+    }
+  }
+
+  @Test
+  public void testFailingFlattenAlone() throws Exception {
+    if(RUN_ADVANCED_TESTS){
+      test("select flatten(categories) from dfs.`/tmp/yelp_academic_dataset_business.json` b  ");
+    }
+  }
+
+  @Test
+  public void testDistinctAggrFlattened() throws Exception {
+    if(RUN_ADVANCED_TESTS){
+      test(" select distinct(celltbl.catl) from (\n" +
+          "        select flatten(categories) catl from dfs.`/tmp/yelp_academic_dataset_business.json` b\n" +
+          "    )  celltbl");
+    }
+
+  }
+
+  @Test
+  public void testDrill1665() throws Exception {
+    if(RUN_ADVANCED_TESTS){
+      test("select id, flatten(evnts) as rpt from dfs.`/tmp/drill1665.json`");
+    }
+
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/test/resources/store/json/test_flatten_mappify2.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/json/test_flatten_mappify2.json b/exec/java-exec/src/test/resources/store/json/test_flatten_mappify2.json
new file mode 100644
index 0000000..f53a0c7
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/json/test_flatten_mappify2.json
@@ -0,0 +1,148 @@
+{
+    "rownum": 1,
+    "bigintegercol": {
+        "int_1": 1,
+        "int_2": 2,
+        "int_3": 3
+    },
+    "varcharcol": {
+        "varchar_1": "abc",
+        "varchar_2": "def",
+        "varchar_3": "xyz"
+    },
+    "boolcol": {
+        "boolean_1": true,
+        "boolean_2": false,
+        "boolean_3": true
+    },
+    "float8col": {
+        "f8_1": 1.1,
+        "f8_2": 2.2
+    },
+    "complex": [
+        {
+            "col1": 3
+        },
+        {
+            "col2": 2,
+            "col3": 1
+        },
+        {
+            "col1": 7
+        }
+    ]
+}
+{
+    "rownum": 2,
+    "bigintegercol": {
+        "int_1": 1,
+        "int_2": 2
+    },
+    "varcharcol": {
+        "varchar_1": "abcd"
+    },
+    "boolcol": {
+        "boolean_1": true
+    },
+    "float8col": {
+        "f8_1": 1.1,
+        "f8_2": 2.2,
+        "f8_3": 3.3
+    },
+    "complex": [
+        {
+            "col2": 2,
+            "col3": 1
+        },
+        {
+            "col1": 7
+        }
+    ]
+}
+{
+    "rownum": 3,
+    "bigintegercol": {
+        "int_1": 1,
+        "int_3": 3
+    },
+    "varcharcol": {
+        "varchar_1": "abcde",
+        "varchar_2": null,
+        "varchar_3": "xyz",
+        "varchar_4": "xyz2"
+    },
+    "boolcol": {
+        "boolean_1": true,
+        "boolean_2": false
+    },
+    "float8col": {
+        "f8_1": 1.1,
+        "f8_3": 6.6
+    },
+    "complex": [
+        {
+            "col1": 2,
+            "col3": 1
+        }
+    ]
+}
+{
+    "rownum": 4,
+    "bigintegercol": {
+        "int_2": 2,
+        "int_3": 3
+    },
+    "varcharcol": {
+        "varchar_1": "abc",
+        "varchar_2": "def"
+    },
+    "boolcol": {
+        "boolean_1": true,
+        "boolean_2": false,
+        "boolean_3": null
+    },
+    "float8col": {
+        "f8_1": 1.1,
+        "f8_2": 2.2
+    },
+    "complex": [
+        {
+            "col1": 3,
+            "col2": 2
+        },
+        {
+            "col3": 1,
+            "col1": 7
+        }
+    ]
+}
+{
+    "rownum": 5,
+    "bigintegercol": {
+        "int_2": 2,
+        "int_3": 3
+    },
+    "varcharcol": {
+        "varchar_1": "abc",
+        "varchar_2": "def"
+    },
+    "boolcol": {
+        "boolean_1": true,
+        "boolean_2": false,
+        "boolean_3": null
+    },
+    "float8col": {
+        "f8_1": 1.1,
+        "f8_2": 2.2
+    },
+    "complex": [
+        {
+            "col1": 3,
+            "col2": 2
+        },
+        {
+            "col3": 1,
+            "col1": 7
+        }
+    ]
+}