You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 05:38:32 UTC
[1/2] git commit: Updates to Parquet for VariableLengthVectors and
Bit fields
Updated Branches:
refs/heads/master 47985bad0 -> c50135256
Updates to Parquet for VariableLengthVectors and Bit fields
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/556bd963
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/556bd963
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/556bd963
Branch: refs/heads/master
Commit: 556bd963aff5e01ab041db3529a7ae4f847f9bd0
Parents: 47985ba
Author: Jason Altekruse <al...@gmial.com>
Authored: Tue Aug 13 18:22:49 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 18:54:14 2013 -0700
----------------------------------------------------------------------
.../templates/VariableLengthVectors.java | 3 +-
.../drill/exec/store/parquet/BitReader.java | 3 +-
.../drill/exec/store/parquet/ColumnReader.java | 5 +--
.../exec/store/parquet/PageReadStatus.java | 2 +-
.../exec/store/parquet/ParquetRecordReader.java | 6 +--
.../drill/exec/vector/BaseDataValueVector.java | 4 +-
.../org/apache/drill/exec/vector/BitVector.java | 13 ++++++-
.../parquet_scan_screen_read_entry_replace.json | 39 ++++++++++++++++++++
sandbox/prototype/pom.xml | 2 +-
9 files changed, 60 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
index 3be6dc2..e3c59fc 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
@@ -14,7 +14,6 @@ import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Random;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.SchemaDefProtos;
import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
@@ -75,7 +74,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
@Override
public FieldMetadata getMetadata() {
- int len = (valueCount+1) * ${type.width} + getVarByteLength();
+ int len = (valueCount + 1) * ${type.width} + getVarByteLength();
return FieldMetadata.newBuilder()
.setDef(getField().getDef())
.setValueCount(valueCount)
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
index c85d4aa..8727341 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
@@ -52,11 +52,10 @@ public final class BitReader extends ColumnReader {
vectorData = ((BaseDataValueVector) valueVecHolder.getValueVector()).getData();
nextByte = bytes[(int) Math.max(0, Math.ceil(pageReadStatus.valuesRead / 8.0) - 1)];
readLengthInBits = recordsReadInThisIteration + pageReadStatus.bitShift;
- //recordsReadInThisIteration -= (8 - pageReadStatus.bitShift);
int i = 0;
// read individual bytes with appropriate shifting
- for (; i <= (int) readLength; i++) {
+ for (; i < (int) readLength; i++) {
currentByte = nextByte;
currentByte = (byte) (currentByte >>> pageReadStatus.bitShift);
// mask the bits about to be added from the next byte
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
index 99b65e6..c62613a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
@@ -39,8 +39,6 @@ public abstract class ColumnReader {
long readPositionInBuffer;
- int compressedSize;
-
// quick reference to see if the field is fixed length (as this requires an instanceof)
boolean isFixedLength;
// counter for the total number of values read from one or more pages
@@ -89,7 +87,6 @@ public abstract class ColumnReader {
// if no page has been read, or all of the records have been read out of a page, read the next one
if (pageReadStatus.currentPage == null
|| pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
- totalValuesRead += pageReadStatus.valuesRead;
if (!pageReadStatus.next()) {
break;
}
@@ -107,7 +104,7 @@ public abstract class ColumnReader {
}
}
while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null);
- ((BaseDataValueVector) valueVecHolder.getValueVector()).getMutator().setValueCount(
+ valueVecHolder.getValueVector().getMutator().setValueCount(
valuesReadInCurrentPass);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
index 0378960..c5c0d87 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
@@ -80,7 +80,7 @@ public final class PageReadStatus {
// because it is needed, but there might be a problem with it
ByteBufInputStream f = new ByteBufInputStream(parentColumnReader.parentReader.getBufferWithAllData().slice(
(int) parentColumnReader.readPositionInBuffer,
- Math.min(50, parentColumnReader.parentReader.getBufferWithAllData().capacity() - (int) parentColumnReader.readPositionInBuffer)));
+ Math.min(200, parentColumnReader.parentReader.getBufferWithAllData().capacity() - (int) parentColumnReader.readPositionInBuffer)));
int before = f.available();
PageHeader pageHeader = readPageHeader(f);
int length = before - f.available();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 2d36a08..4e46034 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -203,8 +203,6 @@ public class ParquetRecordReader implements RecordReader {
@Override
public void setup(OutputMutator output) throws ExecutionSetupException {
- long tA = System.nanoTime(), tB;
- System.out.println( new SimpleDateFormat("mm:ss S").format(new Date()) + " :Start of ParquetRecordReader.setup");
output.removeAllFields();
try {
@@ -232,6 +230,9 @@ public class ParquetRecordReader implements RecordReader {
else{
start = rowGroupOffset;
}
+ // TODO - the methods for get total size and get total uncompressed size seem to have the opposite results of
+ // what they should
+ // I found the bug in the mainline and made a issue for it, hopefully it will be fixed soon
for (ColumnReader crs : columnStatuses){
totalByteLength += crs.columnChunkMetaData.getTotalSize();
}
@@ -256,7 +257,6 @@ public class ParquetRecordReader implements RecordReader {
} catch (IOException e) {
throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + hadoopPath.getName());
}
- System.out.println( "Total time in method: " + ((float) (System.nanoTime() - tA) / 1e9));
}
private static String toFieldName(String[] paths) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index 54a6cb8..f41dcd2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -44,9 +44,7 @@ public abstract class BaseDataValueVector extends BaseValueVector{
}
@Override
- public FieldMetadata getMetadata() {
- return null;
- }
+ public abstract FieldMetadata getMetadata();
public ByteBuf getData(){
return data;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 910b80e..c868dff 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -5,6 +5,7 @@ import io.netty.buffer.ByteBuf;
import java.util.Random;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.record.DeadBuf;
import org.apache.drill.exec.record.MaterializedField;
@@ -27,8 +28,17 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
super(field, allocator);
}
+ @Override
+ public FieldMetadata getMetadata() {
+ return FieldMetadata.newBuilder()
+ .setDef(getField().getDef())
+ .setValueCount(valueCount)
+ .setBufferLength( (int) Math.ceil(valueCount / 8.0))
+ .build();
+ }
+
private int getSizeFromCount(int valueCount) {
- return (int) Math.ceil((float)valueCount / 8);
+ return (int) Math.ceil((float)valueCount / 8.0);
}
/**
@@ -42,6 +52,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
valueCapacity = valueCount;
int valueSize = getSizeFromCount(valueCount);
data = allocator.buffer(valueSize);
+ this.data.retain();
for (int i = 0; i < valueSize; i++) {
data.setByte(i, 0);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json
new file mode 100644
index 0000000..af76e01
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json
@@ -0,0 +1,39 @@
+{
+ head:{
+ type:"APACHE_DRILL_LOGICAL",
+ version:"1",
+ generator:{
+ type:"manual",
+ info:"na"
+ }
+ },
+ storage:{
+ "parquet" :
+ {
+ "type":"parquet",
+ "dfsName" : "file:///"
+ }
+ },
+ query:[
+ {
+ @id:"1",
+ op:"scan",
+ memo:"initial_scan",
+ storageengine:"parquet",
+ selection: [
+ &REPLACED_IN_PARQUET_TEST&
+ ]
+ },
+ {
+ @id:"2",
+ input: 1,
+ op: "store",
+ memo: "output sink",
+ target: {
+ file: "console:///stdout"
+ }
+
+ }
+
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/556bd963/sandbox/prototype/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/pom.xml b/sandbox/prototype/pom.xml
index 382c5ff..76bdf24 100644
--- a/sandbox/prototype/pom.xml
+++ b/sandbox/prototype/pom.xml
@@ -131,7 +131,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.15</version>
<configuration>
- <argLine>-XX:MaxDirectMemorySize=4096M</argLine>
+ <argLine>-XX:MaxDirectMemorySize=4096M </argLine>
</configuration>
</plugin>
<!--This plugin's configuration is used to store Eclipse m2e settings
[2/2] git commit: Updates for Parquet varlen merge
Posted by ja...@apache.org.
Updates for Parquet varlen merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c5013525
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c5013525
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c5013525
Branch: refs/heads/master
Commit: c501352565d1c8e39e1f6e7e66d9fa3a0fe2bf9e
Parents: 556bd96
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu Aug 15 20:38:04 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 20:38:04 2013 -0700
----------------------------------------------------------------------
.../exec/store/parquet/ParquetGroupScan.java | 34 ++++++-----
.../store/parquet/ParquetRecordReaderTest.java | 61 +++++++++++++++-----
.../exec/store/parquet/TestFileGenerator.java | 8 ++-
.../resources/parquet/parquet_scan_screen.json | 44 ++++++++++++++
.../parquet_scan_screen_read_entry_replace.json | 39 +++++++++++++
.../parquet_scan_union_screen_physical.json | 35 +++++++++++
.../src/test/resources/parquet_scan_screen.json | 44 --------------
.../parquet_scan_screen_read_entry_replace.json | 39 -------------
.../parquet_scan_union_screen_physical.json | 35 -----------
9 files changed, 186 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 66c1550..9e48d33 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -18,16 +18,14 @@
package org.apache.drill.exec.store.parquet;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.exception.SetupException;
import org.apache.drill.exec.physical.EndpointAffinity;
@@ -38,13 +36,9 @@ import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.StorageEngineRegistry;
import org.apache.drill.exec.store.AffinityCalculator;
-import org.apache.drill.exec.store.mock.MockGroupScanPOP;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -54,6 +48,12 @@ import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.org.codehaus.jackson.annotate.JsonCreator;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
@JsonTypeName("parquet-scan")
public class ParquetGroupScan extends AbstractGroupScan {
@@ -116,8 +116,10 @@ public class ParquetGroupScan extends AbstractGroupScan {
ColumnChunkMetaData columnChunkMetaData;
for (ReadEntryWithPath readEntryWithPath : entries){
Path path = new Path(readEntryWithPath.getPath());
-
ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path);
+// FileSystem fs = FileSystem.get(this.storageEngine.getHadoopConfig());
+// FileStatus status = fs.getFileStatus(path);
+// ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status);
readEntryWithPath.getPath();
int i = 0;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 5628f50..1d91455 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.types.TypeProtos;
@@ -46,6 +47,7 @@ import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.junit.BeforeClass;
import org.junit.Test;
import parquet.bytes.BytesInput;
@@ -57,6 +59,7 @@ import parquet.hadoop.metadata.ParquetMetadata;
import parquet.schema.MessageType;
import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.util.concurrent.SettableFuture;
@@ -66,23 +69,38 @@ public class ParquetRecordReaderTest {
private boolean VERBOSE_DEBUG = false;
-
- public static void main(String[] args) throws Exception{
- new ParquetRecordReaderTest().testMultipleRowGroupsAndReadsEvent();
- }
+ static final int numberRowGroups = 20;
+ static final int recordsPerRowGroup = 300000;
+ static final String fileName = "/tmp/parquet_test_file_many_types";
-
- @Test
- public void testMultipleRowGroupsAndReadsEvent() throws Exception {
- String planName = "/parquet_scan_screen.json";
- String fileName = "/tmp/parquet_test_file_many_types";
- int numberRowGroups = 20;
- int recordsPerRowGroup = 300000;
+ @BeforeClass
+ public static void generateFile() throws Exception{
File f = new File(fileName);
if(!f.exists()) TestFileGenerator.generateParquetFile(fileName, numberRowGroups, recordsPerRowGroup);
- testParquetFullEngineLocal(planName, fileName, 2, numberRowGroups, recordsPerRowGroup);
+ }
+
+ @Test
+ public void testMultipleRowGroupsAndReads() throws Exception {
+ String planName = "/parquet/parquet_scan_screen.json";
+ testParquetFullEngineLocalPath(planName, fileName, 2, numberRowGroups, recordsPerRowGroup);
+ }
+
+ @Test
+ public void testMultipleRowGroupsAndReads2() throws Exception {
+ String readEntries;
+ readEntries = "";
+ // number of times to read the file
+ int i = 3;
+ for (int j = 0; j < i; j++){
+ readEntries += "{path: \""+fileName+"\"}";
+ if (j < i - 1)
+ readEntries += ",";
+ }
+ String planText = Files.toString(FileUtils.getResourceAsFile("/parquet/parquet_scan_screen_read_entry_replace.json"), Charsets.UTF_8).replaceFirst( "&REPLACED_IN_PARQUET_TEST&", readEntries);
+ testParquetFullEngineLocalText(planText, fileName, i, numberRowGroups, recordsPerRowGroup);
}
+
private class ParquetResultListener implements UserResultsListener {
private SettableFuture<Void> future = SettableFuture.create();
RecordBatchLoader batchLoader;
@@ -198,6 +216,8 @@ public class ParquetRecordReaderTest {
}
+
+
public void testParquetFullEngineRemote(String plan, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{
DrillConfig config = DrillConfig.create();
@@ -212,8 +232,13 @@ public class ParquetRecordReaderTest {
}
- // specific tests should call this method, but it is not marked as a test itself intentionally
- public void testParquetFullEngineLocal(String plan, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{
+
+ public void testParquetFullEngineLocalPath(String planFileName, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{
+ testParquetFullEngineLocalText(Files.toString(FileUtils.getResourceAsFile(planFileName), Charsets.UTF_8), filename, numberOfTimesRead, numberOfRowGroups, recordsPerRowGroup);
+ }
+
+ //specific tests should call this method, but it is not marked as a test itself intentionally
+ public void testParquetFullEngineLocalText(String planText, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{
RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
@@ -224,8 +249,11 @@ public class ParquetRecordReaderTest {
client.connect();
RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator());
ParquetResultListener resultListener = new ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, numberOfTimesRead);
- client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener);
+ Stopwatch watch = new Stopwatch().start();
+ client.runQuery(UserProtos.QueryType.LOGICAL, planText, resultListener);
resultListener.get();
+ System.out.println(String.format("Took %d ms to run query", watch.elapsed(TimeUnit.MILLISECONDS)));
+
}
}
@@ -312,7 +340,8 @@ public class ParquetRecordReaderTest {
assertEquals(footer.getFile().getName(), keyValueMetaData.get(footer.getFile().getName()));
}
}
-
+
+
private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes)
throws IOException {
PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
index 72f9123..1f1f01b 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
@@ -54,7 +54,7 @@ public class TestFileGenerator {
this.parquetType = parquetType;
this.name = name;
this.bitLength = bitLength;
- this.numberOfPages = Math.max(1, (int) Math.ceil(recordsPerRowGroup * bitLength / 8.0 / bytesPerPage));
+ this.numberOfPages = Math.max(1, (int) Math.ceil( ((long) recordsPerRowGroup) * bitLength / 8.0 / bytesPerPage));
this.values = values;
// generator is designed to use 3 values
assert values.length == 3;
@@ -91,7 +91,7 @@ public class TestFileGenerator {
fields.put("bigInt/", new FieldInfo(recordsPerRowGroup, "int64", "bigInt", 64, longVals, TypeProtos.MinorType.BIGINT));
fields.put("f/", new FieldInfo(recordsPerRowGroup, "float", "f", 32, floatVals, TypeProtos.MinorType.FLOAT4));
fields.put("d/", new FieldInfo(recordsPerRowGroup, "double", "d", 64, doubleVals, TypeProtos.MinorType.FLOAT8));
- // fields.put("b/", new FieldInfo("binary", "b", 1, boolVals, TypeProtos.MinorType.BIT));
+ fields.put("b/", new FieldInfo(recordsPerRowGroup, "boolean", "b", 1, boolVals, TypeProtos.MinorType.BIT));
fields.put("bin/", new FieldInfo(recordsPerRowGroup, "binary", "bin", -1, binVals, TypeProtos.MinorType.VARBINARY));
fields.put("bin2/", new FieldInfo(recordsPerRowGroup, "binary", "bin2", -1, bin2Vals, TypeProtos.MinorType.VARBINARY));
return fields;
@@ -129,6 +129,8 @@ public class TestFileGenerator {
int valsWritten;
for (int k = 0; k < numberRowGroups; k++) {
w.startBlock(1);
+ currentBooleanByte = 0;
+ booleanBitCounter.reset();
for (FieldInfo fieldInfo : fields.values()) {
@@ -143,7 +145,7 @@ public class TestFileGenerator {
ColumnDescriptor c1 = schema.getColumnDescription(path1);
w.startColumn(c1, recordsPerRowGroup, codec);
- int valsPerPage = (int) Math.ceil(recordsPerRowGroup / (float) ((int) fieldInfo.numberOfPages));
+ int valsPerPage = (int) Math.ceil(recordsPerRowGroup / (float) fieldInfo.numberOfPages);
byte[] bytes;
// for variable length binary fields
int bytesNeededToEncodeLength = 4;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen.json
new file mode 100644
index 0000000..29cab68
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen.json
@@ -0,0 +1,44 @@
+{
+ head:{
+ type:"APACHE_DRILL_LOGICAL",
+ version:"1",
+ generator:{
+ type:"manual",
+ info:"na"
+ }
+ },
+ storage:{
+ "parquet" :
+ {
+ "type":"parquet",
+ "dfsName" : "file:///"
+ }
+ },
+ query:[
+ {
+ @id:"1",
+ op:"scan",
+ memo:"initial_scan",
+ storageengine:"parquet",
+ selection: [
+ {
+ path: "/tmp/parquet_test_file_many_types"
+ },
+ {
+ path: "/tmp/parquet_test_file_many_types"
+ }
+ ]
+ },
+ {
+ @id:"2",
+ input: 1,
+ op: "store",
+ memo: "output sink",
+ target: {
+ file: "console:///stdout"
+ }
+
+ }
+
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen_read_entry_replace.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen_read_entry_replace.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen_read_entry_replace.json
new file mode 100644
index 0000000..af76e01
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_screen_read_entry_replace.json
@@ -0,0 +1,39 @@
+{
+ head:{
+ type:"APACHE_DRILL_LOGICAL",
+ version:"1",
+ generator:{
+ type:"manual",
+ info:"na"
+ }
+ },
+ storage:{
+ "parquet" :
+ {
+ "type":"parquet",
+ "dfsName" : "file:///"
+ }
+ },
+ query:[
+ {
+ @id:"1",
+ op:"scan",
+ memo:"initial_scan",
+ storageengine:"parquet",
+ selection: [
+ &REPLACED_IN_PARQUET_TEST&
+ ]
+ },
+ {
+ @id:"2",
+ input: 1,
+ op: "store",
+ memo: "output sink",
+ target: {
+ file: "console:///stdout"
+ }
+
+ }
+
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
new file mode 100644
index 0000000..f508d09
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
@@ -0,0 +1,35 @@
+{
+ head : {
+ type : "APACHE_DRILL_PHYSICAL",
+ version : 1,
+ generator : {
+ type : "manual"
+ }
+ },
+ graph : [ {
+ pop : "parquet-scan",
+ @id : 1,
+ entries : [
+ {
+ path : "/tmp/testParquetFile_many_types_3"
+ },
+ {
+ path : "/tmp/testParquetFile_many_types_3"
+ }
+ ],
+ storageengine:{
+ "type":"parquet",
+ "dfsName" : "file:///"
+ }
+ },
+ {
+ "@id": 2,
+ "child": 1,
+ "pop": "union-exchange"
+ },
+ {
+ pop : "screen",
+ @id : 3,
+ child : 2
+ } ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
deleted file mode 100644
index 29cab68..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen.json
+++ /dev/null
@@ -1,44 +0,0 @@
-{
- head:{
- type:"APACHE_DRILL_LOGICAL",
- version:"1",
- generator:{
- type:"manual",
- info:"na"
- }
- },
- storage:{
- "parquet" :
- {
- "type":"parquet",
- "dfsName" : "file:///"
- }
- },
- query:[
- {
- @id:"1",
- op:"scan",
- memo:"initial_scan",
- storageengine:"parquet",
- selection: [
- {
- path: "/tmp/parquet_test_file_many_types"
- },
- {
- path: "/tmp/parquet_test_file_many_types"
- }
- ]
- },
- {
- @id:"2",
- input: 1,
- op: "store",
- memo: "output sink",
- target: {
- file: "console:///stdout"
- }
-
- }
-
- ]
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json
deleted file mode 100644
index af76e01..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_screen_read_entry_replace.json
+++ /dev/null
@@ -1,39 +0,0 @@
-{
- head:{
- type:"APACHE_DRILL_LOGICAL",
- version:"1",
- generator:{
- type:"manual",
- info:"na"
- }
- },
- storage:{
- "parquet" :
- {
- "type":"parquet",
- "dfsName" : "file:///"
- }
- },
- query:[
- {
- @id:"1",
- op:"scan",
- memo:"initial_scan",
- storageengine:"parquet",
- selection: [
- &REPLACED_IN_PARQUET_TEST&
- ]
- },
- {
- @id:"2",
- input: 1,
- op: "store",
- memo: "output sink",
- target: {
- file: "console:///stdout"
- }
-
- }
-
- ]
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5013525/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json
deleted file mode 100644
index 954082c..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet_scan_union_screen_physical.json
+++ /dev/null
@@ -1,35 +0,0 @@
-{
- head : {
- type : "APACHE_DRILL_PHYSICAL",
- version : 1,
- generator : {
- type : "manual"
- }
- },
- graph : [ {
- pop : "parquet-scan",
- @id : 1,
- entries : [
- {
- path : "/tmp/testParquetFile_many_types_3"
- },
- {
- path : "/tmp/testParquetFile_many_types_3"
- }
- ],
- storageengine:{
- "type":"parquet",
- "dfsName" : "maprfs:///"
- }
- },
- {
- "@id": 2,
- "child": 1,
- "pop": "union-exchange"
- },
- {
- pop : "screen",
- @id : 3,
- child : 2
- } ]
-}
\ No newline at end of file