You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/11/12 05:11:25 UTC
[08/16] incubator-drill git commit: DRILL-1643,
DRILL-1665: Flatten fixes - Fix repeated map vector to correctly
report value count - Update flatten so init variables are reset for each new
batch.
DRILL-1643, DRILL-1665: Flatten fixes
- Fix repeated map vector to correctly report value count
- Update flatten so init variables are reset for each new batch.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/ed962497
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/ed962497
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/ed962497
Branch: refs/heads/master
Commit: ed962497f04d432591b33b7532741b07ab46fbfe
Parents: 761156b
Author: Jason Altekruse <al...@gmail.com>
Authored: Wed Nov 5 18:11:22 2014 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Nov 11 16:48:45 2014 -0800
----------------------------------------------------------------------
.../impl/flatten/FlattenRecordBatch.java | 2 +
.../physical/impl/flatten/FlattenTemplate.java | 40 +++--
.../exec/physical/impl/flatten/Flattener.java | 1 +
.../exec/vector/complex/RepeatedMapVector.java | 9 +-
.../exec/physical/impl/flatten/TestFlatten.java | 99 +++++++++++++
.../store/json/test_flatten_mappify2.json | 148 +++++++++++++++++++
6 files changed, 286 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 129174e..66c6168 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -161,6 +161,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
this.recordCount = remainderIndex;
} else {
setValueCount(outputRecords);
+ flattener.resetGroupIndex();
for(VectorWrapper<?> v: incoming) {
v.clear();
}
@@ -194,6 +195,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
for (VectorWrapper<?> v : incoming) {
v.clear();
}
+ flattener.resetGroupIndex();
this.recordCount = remainingRecordCount;
}
// In case of complex writer expression, vectors would be added to batch run-time.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
index af4cead..c5d3d93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -30,6 +30,8 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import com.google.common.collect.ImmutableList;
+
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector.RepeatedAccessor;
import org.apache.drill.exec.vector.RepeatedVector;
public abstract class FlattenTemplate implements Flattener {
@@ -40,7 +42,9 @@ public abstract class FlattenTemplate implements Flattener {
private SelectionVector4 vector4;
private SelectionVectorMode svMode;
RepeatedVector fieldToFlatten;
+ RepeatedAccessor accessor;
private int groupIndex;
+
// this allows for groups to be written between batches if we run out of space, for cases where we have finished
// a batch on the boundary it will be set to 0
private int childIndexWithinCurrGroup;
@@ -56,6 +60,7 @@ public abstract class FlattenTemplate implements Flattener {
@Override
public void setFlattenField(RepeatedVector flattenField) {
this.fieldToFlatten = flattenField;
+ this.accessor = flattenField.getAccessor();
}
public RepeatedVector getFlattenField() {
@@ -76,18 +81,26 @@ public abstract class FlattenTemplate implements Flattener {
if (childIndexWithinCurrGroup == -1) {
childIndexWithinCurrGroup = 0;
}
- outer:
- for ( ; groupIndex < fieldToFlatten.getAccessor().getGroupCount(); groupIndex++) {
- currGroupSize = fieldToFlatten.getAccessor().getGroupSizeAtIndex(groupIndex);
- for ( ; childIndexWithinCurrGroup < currGroupSize; childIndexWithinCurrGroup++) {
- if (!doEval(groupIndex, firstOutputIndex)) {
- break outer;
+ outer: {
+ final int groupCount = accessor.getGroupCount();
+ for ( ; groupIndex < groupCount; groupIndex++) {
+ currGroupSize = accessor.getGroupSizeAtIndex(groupIndex);
+ for ( ; childIndexWithinCurrGroup < currGroupSize; childIndexWithinCurrGroup++) {
+ if (!doEval(groupIndex, firstOutputIndex)) {
+ break outer;
+ }
+ firstOutputIndex++;
+ childIndex++;
}
- firstOutputIndex++;
- childIndex++;
+ childIndexWithinCurrGroup = 0;
}
- childIndexWithinCurrGroup = 0;
}
+// System.out.println(String.format("startIndex %d, recordCount %d, firstOutputIndex: %d, currGroupSize: %d, childIndexWithinCurrGroup: %d, groupIndex: %d", startIndex, recordCount, firstOutputIndex, currGroupSize, childIndexWithinCurrGroup, groupIndex));
+// try{
+//// Thread.sleep(1000);
+// }catch(Exception e){
+//
+// }
for (TransferPair t : transfers) {
t.splitAndTransfer(startIndex, childIndex - startIndex);
@@ -113,6 +126,15 @@ public abstract class FlattenTemplate implements Flattener {
doSetup(context, incoming, outgoing);
}
+
+
+ @Override
+ public void resetGroupIndex() {
+ this.groupIndex = 0;
+ this.currGroupSize = 0;
+ this.childIndex = 0;
+ }
+
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
index 49b9c1b..2141ca2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
@@ -32,6 +32,7 @@ public interface Flattener {
public abstract int flattenRecords(int startIndex, int recordCount, int firstOutputIndex);
public void setFlattenField(RepeatedVector repeatedColumn);
public RepeatedVector getFlattenField();
+ public void resetGroupIndex();
public static TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 99b9453..9b7011c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -420,7 +420,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
public void load(SerializedField metadata, DrillBuf buf) {
List<SerializedField> fields = metadata.getChildList();
- int bufOffset = offsets.load(metadata.getValueCount()+1, buf);
+ int bufOffset = offsets.load(metadata.getGroupCount()+1, buf);
for (SerializedField fmd : fields) {
MaterializedField fieldDef = MaterializedField.create(fmd);
@@ -444,7 +444,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
SerializedField.Builder b = getField() //
.getAsBuilder() //
.setBufferLength(getBufferSize()) //
- .setValueCount(accessor.getValueCount());
+ .setGroupCount(accessor.getGroupCount());
for (ValueVector v : vectors.values()) {
b.addChild(v.getMetadata());
}
@@ -489,7 +489,8 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
@Override
public int getValueCount() {
- return offsets.getAccessor().getValueCount() - 1;
+ return offsets.getAccessor().get(offsets.getAccessor().getValueCount() - 1);
+// return offsets.getAccessor().getValueCount() - 1;
}
public int getGroupSizeAtIndex(int index) {
@@ -542,7 +543,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
@Override
public int getGroupCount() {
- return size();
+ return offsets.getAccessor().getValueCount() - 1;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
index 9514517..d4c19a3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java
@@ -22,21 +22,120 @@ import org.junit.Test;
public class TestFlatten extends BaseTestQuery {
+ /**
+ * enable this if you have the following files:
+ * - /tmp/yelp_academic_dataset_business.json
+ * - /tmp/mapkv.json
+ * - /tmp/drill1665.json
+ */
+ public static boolean RUN_ADVANCED_TESTS = false;
+
+
+ @Test
+ public void testFlattenFailure() throws Exception {
+ test("select flatten(complex), rownum from cp.`/store/json/test_flatten_mappify2.json`");
+// test("select complex, rownum from cp.`/store/json/test_flatten_mappify2.json`");
+ }
+
@Test
public void testKVGenFlatten1() throws Exception {
+ // works - TODO and verify results
test("select flatten(kvgen(f1)) as monkey, x " +
"from cp.`/store/json/test_flatten_mapify.json`");
}
@Test
public void testTwoFlattens() throws Exception {
+ // second re-write rule has been added to test the fixes together, this now runs
test("select `integer`, `float`, x, flatten(z), flatten(l) from cp.`/jsoninput/input2_modified.json`");
}
@Test
+ public void testFlattenRepeatedMap() throws Exception {
+ test("select `integer`, `float`, x, flatten(z) from cp.`/jsoninput/input2.json`");
+ }
+
+ @Test
+ public void testFlattenKVGenFlatten() throws Exception {
+ // currently does not fail, but produces incorrect results, requires second re-write rule to split up expressions
+ // with complex outputs
+ test("select `integer`, `float`, x, flatten(kvgen(flatten(z))) from cp.`/jsoninput/input2.json`");
+ }
+
+ @Test
+ public void testKVGenFlatten2() throws Exception {
+ // currently runs
+ // TODO - re-verify results by hand
+ if(RUN_ADVANCED_TESTS){
+ test("select flatten(kvgen(visited_cellid_counts)) as mytb from dfs.`/tmp/mapkv.json`") ;
+ }
+ }
+
+ @Test
public void testFilterFlattenedRecords() throws Exception {
+ // WORKS!!
+ // TODO - hand verify results
test("select t2.key from (select t.monkey.`value` as val, t.monkey.key as key from (select flatten(kvgen(f1)) as monkey, x " +
"from cp.`/store/json/test_flatten_mapify.json`) as t) as t2 where t2.val > 1");
}
+ @Test
+ public void testFilterFlattenedRecords2() throws Exception {
+ // previously failed in generated code
+ // "value" is neither a method, a field, nor a member class of "org.apache.drill.exec.expr.holders.RepeatedVarCharHolder" [ 42eb1fa1-0742-4e4f-8723-609215c18900 on 10.250.0.86:31010 ]
+ // appears to be resolving the data coming out of flatten as repeated, check fast schema stuff
+
+ // FIXED BY RETURNING PROPER SCHEMA DURING FAST SCHEMA STEP
+ // these types of problems are being solved more generally as we develp better support for chaning schema
+ if(RUN_ADVANCED_TESTS){
+ test("select celltbl.catl from (\n" +
+ " select flatten(categories) catl from dfs.`/tmp/yelp_academic_dataset_business.json` b limit 100\n" +
+ " ) celltbl where celltbl.catl = 'Doctors'");
+ }
+ }
+
+ @Test
+ public void countAggFlattened() throws Exception {
+ if(RUN_ADVANCED_TESTS){
+ test("select celltbl.catl, count(celltbl.catl) from ( " +
+ "select business_id, flatten(categories) catl from dfs.`/tmp/yelp_academic_dataset_business.json` b limit 100 " +
+ ") celltbl group by celltbl.catl limit 10 ");
+ }
+ }
+
+
+ @Test
+ public void flattenAndAdditionalColumn() throws Exception {
+ if(RUN_ADVANCED_TESTS){
+ test("select business_id, flatten(categories) from dfs.`/tmp/yelp_academic_dataset_business.json` b");
+ }
+ }
+
+ @Test
+ public void testFailingFlattenAlone() throws Exception {
+ if(RUN_ADVANCED_TESTS){
+ test("select flatten(categories) from dfs.`/tmp/yelp_academic_dataset_business.json` b ");
+ }
+ }
+
+ @Test
+ public void testDistinctAggrFlattened() throws Exception {
+ if(RUN_ADVANCED_TESTS){
+ test(" select distinct(celltbl.catl) from (\n" +
+ " select flatten(categories) catl from dfs.`/tmp/yelp_academic_dataset_business.json` b\n" +
+ " ) celltbl");
+ }
+
+ }
+
+ @Test
+ public void testDrill1665() throws Exception {
+ if(RUN_ADVANCED_TESTS){
+ test("select id, flatten(evnts) as rpt from dfs.`/tmp/drill1665.json`");
+ }
+
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/ed962497/exec/java-exec/src/test/resources/store/json/test_flatten_mappify2.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/json/test_flatten_mappify2.json b/exec/java-exec/src/test/resources/store/json/test_flatten_mappify2.json
new file mode 100644
index 0000000..f53a0c7
--- /dev/null
+++ b/exec/java-exec/src/test/resources/store/json/test_flatten_mappify2.json
@@ -0,0 +1,148 @@
+{
+ "rownum": 1,
+ "bigintegercol": {
+ "int_1": 1,
+ "int_2": 2,
+ "int_3": 3
+ },
+ "varcharcol": {
+ "varchar_1": "abc",
+ "varchar_2": "def",
+ "varchar_3": "xyz"
+ },
+ "boolcol": {
+ "boolean_1": true,
+ "boolean_2": false,
+ "boolean_3": true
+ },
+ "float8col": {
+ "f8_1": 1.1,
+ "f8_2": 2.2
+ },
+ "complex": [
+ {
+ "col1": 3
+ },
+ {
+ "col2": 2,
+ "col3": 1
+ },
+ {
+ "col1": 7
+ }
+ ]
+}
+{
+ "rownum": 2,
+ "bigintegercol": {
+ "int_1": 1,
+ "int_2": 2
+ },
+ "varcharcol": {
+ "varchar_1": "abcd"
+ },
+ "boolcol": {
+ "boolean_1": true
+ },
+ "float8col": {
+ "f8_1": 1.1,
+ "f8_2": 2.2,
+ "f8_3": 3.3
+ },
+ "complex": [
+ {
+ "col2": 2,
+ "col3": 1
+ },
+ {
+ "col1": 7
+ }
+ ]
+}
+{
+ "rownum": 3,
+ "bigintegercol": {
+ "int_1": 1,
+ "int_3": 3
+ },
+ "varcharcol": {
+ "varchar_1": "abcde",
+ "varchar_2": null,
+ "varchar_3": "xyz",
+ "varchar_4": "xyz2"
+ },
+ "boolcol": {
+ "boolean_1": true,
+ "boolean_2": false
+ },
+ "float8col": {
+ "f8_1": 1.1,
+ "f8_3": 6.6
+ },
+ "complex": [
+ {
+ "col1": 2,
+ "col3": 1
+ }
+ ]
+}
+{
+ "rownum": 4,
+ "bigintegercol": {
+ "int_2": 2,
+ "int_3": 3
+ },
+ "varcharcol": {
+ "varchar_1": "abc",
+ "varchar_2": "def"
+ },
+ "boolcol": {
+ "boolean_1": true,
+ "boolean_2": false,
+ "boolean_3": null
+ },
+ "float8col": {
+ "f8_1": 1.1,
+ "f8_2": 2.2
+ },
+ "complex": [
+ {
+ "col1": 3,
+ "col2": 2
+ },
+ {
+ "col3": 1,
+ "col1": 7
+ }
+ ]
+}
+{
+ "rownum": 5,
+ "bigintegercol": {
+ "int_2": 2,
+ "int_3": 3
+ },
+ "varcharcol": {
+ "varchar_1": "abc",
+ "varchar_2": "def"
+ },
+ "boolcol": {
+ "boolean_1": true,
+ "boolean_2": false,
+ "boolean_3": null
+ },
+ "float8col": {
+ "f8_1": 1.1,
+ "f8_2": 2.2
+ },
+ "complex": [
+ {
+ "col1": 3,
+ "col2": 2
+ },
+ {
+ "col3": 1,
+ "col1": 7
+ }
+ ]
+}