You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 05:38:33 UTC
[2/2] git commit: Updates for Parquet varlen merge
Updates for Parquet varlen merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c5013525
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c5013525
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c5013525
Branch: refs/heads/master
Commit: c501352565d1c8e39e1f6e7e66d9fa3a0fe2bf9e
Parents: 556bd96
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu Aug 15 20:38:04 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 20:38:04 2013 -0700
----------------------------------------------------------------------
.../exec/store/parquet/ParquetGroupScan.java | 34 ++++++-----
.../store/parquet/ParquetRecordReaderTest.java | 61 +++++++++++++++-----
.../exec/store/parquet/TestFileGenerator.java | 8 ++-
.../resources/parquet/parquet_scan_screen.json | 44 ++++++++++++++
.../parquet_scan_screen_read_entry_replace.json | 39 +++++++++++++
.../parquet_scan_union_screen_physical.json | 35 +++++++++++
.../src/test/resources/parquet_scan_screen.json | 44 --------------
.../parquet_scan_screen_read_entry_replace.json | 39 -------------
.../parquet_scan_union_screen_physical.json | 35 -----------
9 files changed, 186 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 66c1550..9e48d33 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -18,16 +18,14 @@
package org.apache.drill.exec.store.parquet;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.exception.SetupException;
import org.apache.drill.exec.physical.EndpointAffinity;
@@ -38,13 +36,9 @@ import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.StorageEngineRegistry;
import org.apache.drill.exec.store.AffinityCalculator;
-import org.apache.drill.exec.store.mock.MockGroupScanPOP;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -54,6 +48,12 @@ import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.org.codehaus.jackson.annotate.JsonCreator;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
@JsonTypeName("parquet-scan")
public class ParquetGroupScan extends AbstractGroupScan {
@@ -116,8 +116,10 @@ public class ParquetGroupScan extends AbstractGroupScan {
ColumnChunkMetaData columnChunkMetaData;
for (ReadEntryWithPath readEntryWithPath : entries){
Path path = new Path(readEntryWithPath.getPath());
-
ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path);
+// FileSystem fs = FileSystem.get(this.storageEngine.getHadoopConfig());
+// FileStatus status = fs.getFileStatus(path);
+// ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status);
readEntryWithPath.getPath();
int i = 0;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 5628f50..1d91455 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.types.TypeProtos;
@@ -46,6 +47,7 @@ import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.junit.BeforeClass;
import org.junit.Test;
import parquet.bytes.BytesInput;
@@ -57,6 +59,7 @@ import parquet.hadoop.metadata.ParquetMetadata;
import parquet.schema.MessageType;
import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.util.concurrent.SettableFuture;
@@ -66,23 +69,38 @@ public class ParquetRecordReaderTest {
private boolean VERBOSE_DEBUG = false;
-
- public static void main(String[] args) throws Exception{
- new ParquetRecordReaderTest().testMultipleRowGroupsAndReadsEvent();
- }
+ static final int numberRowGroups = 20;
+ static final int recordsPerRowGroup = 300000;
+ static final String fileName = "/tmp/parquet_test_file_many_types";
-
- @Test
- public void testMultipleRowGroupsAndReadsEvent() throws Exception {
- String planName = "/parquet_scan_screen.json";
- String fileName = "/tmp/parquet_test_file_many_types";
- int numberRowGroups = 20;
- int recordsPerRowGroup = 300000;
+ @BeforeClass
+ public static void generateFile() throws Exception{
File f = new File(fileName);
if(!f.exists()) TestFileGenerator.generateParquetFile(fileName, numberRowGroups, recordsPerRowGroup);
- testParquetFullEngineLocal(planName, fileName, 2, numberRowGroups, recordsPerRowGroup);
+ }
+
+ @Test
+ public void testMultipleRowGroupsAndReads() throws Exception {
+ String planName = "/parquet/parquet_scan_screen.json";
+ testParquetFullEngineLocalPath(planName, fileName, 2, numberRowGroups, recordsPerRowGroup);
+ }
+
+ @Test
+ public void testMultipleRowGroupsAndReads2() throws Exception {
+ String readEntries;
+ readEntries = "";
+ // number of times to read the file
+ int i = 3;
+ for (int j = 0; j < i; j++){
+ readEntries += "{path: \""+fileName+"\"}";
+ if (j < i - 1)
+ readEntries += ",";
+ }
+ String planText = Files.toString(FileUtils.getResourceAsFile("/parquet/parquet_scan_screen_read_entry_replace.json"), Charsets.UTF_8).replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries);
+ testParquetFullEngineLocalText(planText, fileName, i, numberRowGroups, recordsPerRowGroup);
}
+
private class ParquetResultListener implements UserResultsListener {
private SettableFuture<Void> future = SettableFuture.create();
RecordBatchLoader batchLoader;
@@ -198,6 +216,8 @@ public class ParquetRecordReaderTest {
}
+
+
public void testParquetFullEngineRemote(String plan, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{
DrillConfig config = DrillConfig.create();
@@ -212,8 +232,13 @@ public class ParquetRecordReaderTest {
}
- // specific tests should call this method, but it is not marked as a test itself intentionally
- public void testParquetFullEngineLocal(String plan, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{
+
+ public void testParquetFullEngineLocalPath(String planFileName, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{
+ testParquetFullEngineLocalText(Files.toString(FileUtils.getResourceAsFile(planFileName), Charsets.UTF_8), filename, numberOfTimesRead, numberOfRowGroups, recordsPerRowGroup);
+ }
+
+ //specific tests should call this method, but it is not marked as a test itself intentionally
+ public void testParquetFullEngineLocalText(String planText, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{
RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
@@ -224,8 +249,11 @@ public class ParquetRecordReaderTest {
client.connect();
RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator());
ParquetResultListener resultListener = new ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, numberOfTimesRead);
- client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener);
+ Stopwatch watch = new Stopwatch().start();
+ client.runQuery(UserProtos.QueryType.LOGICAL, planText, resultListener);
resultListener.get();
+ System.out.println(String.format("Took %d ms to run query", watch.elapsed(TimeUnit.MILLISECONDS)));
+
}
}
@@ -312,7 +340,8 @@ public class ParquetRecordReaderTest {
assertEquals(footer.getFile().getName(), keyValueMetaData.get(footer.getFile().getName()));
}
}
-
+
+
private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes)
throws IOException {
PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
index 72f9123..1f1f01b 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
@@ -54,7 +54,7 @@ public class TestFileGenerator {
this.parquetType = parquetType;
this.name = name;
this.bitLength = bitLength;
- this.numberOfPages = Math.max(1, (int) Math.ceil(recordsPerRowGroup * bitLength / 8.0 / bytesPerPage));
+ this.numberOfPages = Math.max(1, (int) Math.ceil( ((long) recordsPerRowGroup) * bitLength / 8.0 / bytesPerPage));
this.values = values;
// generator is designed to use 3 values
assert values.length == 3;
@@ -91,7 +91,7 @@ public class TestFileGenerator {
fields.put("bigInt/", new FieldInfo(recordsPerRowGroup, "int64", "bigInt", 64, longVals, TypeProtos.MinorType.BIGINT));
fields.put("f/", new FieldInfo(recordsPerRowGroup, "float", "f", 32, floatVals, TypeProtos.MinorType.FLOAT4));
fields.put("d/", new FieldInfo(recordsPerRowGroup, "double", "d", 64, doubleVals, TypeProtos.MinorType.FLOAT8));
- // fields.put("b/", new FieldInfo("binary", "b", 1, boolVals, TypeProtos.MinorType.BIT));
+ fields.put("b/", new FieldInfo(recordsPerRowGroup, "boolean", "b", 1, boolVals, TypeProtos.MinorType.BIT));
fields.put("bin/", new FieldInfo(recordsPerRowGroup, "binary", "bin", -1, binVals, TypeProtos.MinorType.VARBINARY));
fields.put("bin2/", new FieldInfo(recordsPerRowGroup, "binary", "bin2", -1, bin2Vals, TypeProtos.MinorType.VARBINARY));
return fields;
@@ -129,6 +129,8 @@ public class TestFileGenerator {
int valsWritten;
for (int k = 0; k < numberRowGroups; k++) {
w.startBlock(1);
+ currentBooleanByte = 0;
+ booleanBitCounter.reset();
for (FieldInfo fieldInfo : fields.values()) {
@@ -143,7 +145,7 @@ public class TestFileGenerator {
ColumnDescriptor c1 = schema.getColumnDescription(path1);
w.startColumn(c1, recordsPerRowGroup, codec);
- int valsPerPage = (int) Math.ceil(recordsPerRowGroup / (float) ((int) fieldInfo.numberOfPages));
+ int valsPerPage = (int) Math.ceil(recordsPerRowGroup / (float) fieldInfo.numberOfPages);
byte[] bytes;
// for variable length binary fields
int bytesNeededToEncodeLength = 4;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen.json
new file mode 100644
index 0000000..29cab68
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen.json
@@ -0,0 +1,44 @@
+{
+ head:{
+ type:"APACHE_DRILL_LOGICAL",
+ version:"1",
+ generator:{
+ type:"manual",
+ info:"na"
+ }
+ },
+ storage:{
+ "parquet" :
+ {
+ "type":"parquet",
+ "dfsName" : "file:///"
+ }
+ },
+ query:[
+ {
+ @id:"1",
+ op:"scan",
+ memo:"initial_scan",
+ storageengine:"parquet",
+ selection: [
+ {
+ path: "/tmp/parquet_test_file_many_types"
+ },
+ {
+ path: "/tmp/parquet_test_file_many_types"
+ }
+ ]
+ },
+ {
+ @id:"2",
+ input: 1,
+ op: "store",
+ memo: "output sink",
+ target: {
+ file: "console:///stdout"
+ }
+
+ }
+
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen_read_entry_replace.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen_read_entry_replace.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen_read_entry_replace.json
new file mode 100644
index 0000000..af76e01
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen_read_entry_replace.json
@@ -0,0 +1,39 @@
+{
+ head:{
+ type:"APACHE_DRILL_LOGICAL",
+ version:"1",
+ generator:{
+ type:"manual",
+ info:"na"
+ }
+ },
+ storage:{
+ "parquet" :
+ {
+ "type":"parquet",
+ "dfsName" : "file:///"
+ }
+ },
+ query:[
+ {
+ @id:"1",
+ op:"scan",
+ memo:"initial_scan",
+ storageengine:"parquet",
+ selection: [
+ &REPLACED_IN_PARQUET_TEST&
+ ]
+ },
+ {
+ @id:"2",
+ input: 1,
+ op: "store",
+ memo: "output sink",
+ target: {
+ file: "console:///stdout"
+ }
+
+ }
+
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
new file mode 100644
index 0000000..f508d09
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
@@ -0,0 +1,35 @@
+{
+ head : {
+ type : "APACHE_DRILL_PHYSICAL",
+ version : 1,
+ generator : {
+ type : "manual"
+ }
+ },
+ graph : [ {
+ pop : "parquet-scan",
+ @id : 1,
+ entries : [
+ {
+ path : "/tmp/testParquetFile_many_types_3"
+ },
+ {
+ path : "/tmp/testParquetFile_many_types_3"
+ }
+ ],
+ storageengine:{
+ "type":"parquet",
+ "dfsName" : "file:///"
+ }
+ },
+ {
+ "@id": 2,
+ "child": 1,
+ "pop": "union-exchange"
+ },
+ {
+ pop : "screen",
+ @id : 3,
+ child : 2
+ } ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
deleted file mode 100644
index 29cab68..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
+++ /dev/null
@@ -1,44 +0,0 @@
-{
- head:{
- type:"APACHE_DRILL_LOGICAL",
- version:"1",
- generator:{
- type:"manual",
- info:"na"
- }
- },
- storage:{
- "parquet" :
- {
- "type":"parquet",
- "dfsName" : "file:///"
- }
- },
- query:[
- {
- @id:"1",
- op:"scan",
- memo:"initial_scan",
- storageengine:"parquet",
- selection: [
- {
- path: "/tmp/parquet_test_file_many_types"
- },
- {
- path: "/tmp/parquet_test_file_many_types"
- }
- ]
- },
- {
- @id:"2",
- input: 1,
- op: "store",
- memo: "output sink",
- target: {
- file: "console:///stdout"
- }
-
- }
-
- ]
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json
deleted file mode 100644
index af76e01..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json
+++ /dev/null
@@ -1,39 +0,0 @@
-{
- head:{
- type:"APACHE_DRILL_LOGICAL",
- version:"1",
- generator:{
- type:"manual",
- info:"na"
- }
- },
- storage:{
- "parquet" :
- {
- "type":"parquet",
- "dfsName" : "file:///"
- }
- },
- query:[
- {
- @id:"1",
- op:"scan",
- memo:"initial_scan",
- storageengine:"parquet",
- selection: [
- &REPLACED_IN_PARQUET_TEST&
- ]
- },
- {
- @id:"2",
- input: 1,
- op: "store",
- memo: "output sink",
- target: {
- file: "console:///stdout"
- }
-
- }
-
- ]
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json
deleted file mode 100644
index 954082c..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json
+++ /dev/null
@@ -1,35 +0,0 @@
-{
- head : {
- type : "APACHE_DRILL_PHYSICAL",
- version : 1,
- generator : {
- type : "manual"
- }
- },
- graph : [ {
- pop : "parquet-scan",
- @id : 1,
- entries : [
- {
- path : "/tmp/testParquetFile_many_types_3"
- },
- {
- path : "/tmp/testParquetFile_many_types_3"
- }
- ],
- storageengine:{
- "type":"parquet",
- "dfsName" : "maprfs:///"
- }
- },
- {
- "@id": 2,
- "child": 1,
- "pop": "union-exchange"
- },
- {
- pop : "screen",
- @id : 3,
- child : 2
- } ]
-}
\ No newline at end of file