You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:47 UTC
[09/27] Initial Parquet commit. Suports INT, LONG, FLOAT, DOUBLE,
distributed scheduling.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
new file mode 100644
index 0000000..2ad7b44
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -0,0 +1,403 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.TypeHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.column.ColumnDescriptor;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
+import parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class ParquetRecordReader implements RecordReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
+
+ // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
+ private static final int NUMBER_OF_VECTORS = 1;
+ private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
+ private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
+
+ // TODO - should probably find a smarter way to set this, currently 2 megabytes
+ private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 2;
+ public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 5;
+ private static final String SEPERATOR = System.getProperty("file.separator");
+
+
+ // used for clearing the last n bits of a byte
+ public static final byte[] endBitMasks = {-2, -4, -8, -16, -32, -64, -128};
+ // used for clearing the first n bits of a byte
+ public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1};
+
+ private int bitWidthAllFixedFields;
+ private boolean allFieldsFixedLength;
+ private int recordsPerBatch;
+ private ByteBuf bufferWithAllData;
+ long totalRecords;
+ long rowGroupOffset;
+
+ private List<ColumnReader> columnStatuses;
+ FileSystem fileSystem;
+ private BufferAllocator allocator;
+ private long batchSize;
+ Path hadoopPath;
+ private final VarLenBinaryReader varLengthReader;
+
+ public CodecFactoryExposer getCodecFactoryExposer() {
+ return codecFactoryExposer;
+ }
+
+ private final CodecFactoryExposer codecFactoryExposer;
+
+ int rowGroupIndex;
+
+ public ParquetRecordReader(FragmentContext fragmentContext,
+ String path, int rowGroupIndex, FileSystem fs,
+ CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer) throws ExecutionSetupException {
+ this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer);
+ }
+
+
+ public ParquetRecordReader(FragmentContext fragmentContext, long batchSize,
+ String path, int rowGroupIndex, FileSystem fs,
+ CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer) throws ExecutionSetupException {
+ this.allocator = fragmentContext.getAllocator();
+
+ hadoopPath = new Path(path);
+ fileSystem = fs;
+ this.codecFactoryExposer = codecFactoryExposer;
+ this.rowGroupIndex = rowGroupIndex;
+ this.batchSize = batchSize;
+
+ columnStatuses = new ArrayList<>();
+
+ totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
+
+ List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
+ allFieldsFixedLength = true;
+ ColumnDescriptor column;
+ ColumnChunkMetaData columnChunkMetaData;
+
+ // loop to add up the length of the fixed width columns and build the schema
+ for (int i = 0; i < columns.size(); ++i) {
+ column = columns.get(i);
+
+ // sum the lengths of all of the fixed length fields
+ if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+ // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder
+ // TODO - implement this when the feature is added upstream
+// if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){
+// byteWidthAllFixedFields += column.getType().getWidth()
+// }
+// else { } // the code below for the rest of the fixed length fields
+
+ bitWidthAllFixedFields += getTypeLengthInBits(column.getType());
+ } else {
+ allFieldsFixedLength = false;
+ }
+
+ }
+ rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
+
+ if (allFieldsFixedLength) {
+ recordsPerBatch = (int) Math.min(batchSize / bitWidthAllFixedFields, footer.getBlocks().get(0).getColumns().get(0).getValueCount());
+ }
+ try {
+ ArrayList<VarLenBinaryReader.VarLengthColumn> varLengthColumns = new ArrayList<>();
+ // initialize all of the column read status objects
+ boolean fieldFixedLength = false;
+ MaterializedField field;
+ for (int i = 0; i < columns.size(); ++i) {
+ column = columns.get(i);
+ columnChunkMetaData = footer.getBlocks().get(0).getColumns().get(i);
+ field = MaterializedField.create(new SchemaPath(toFieldName(column.getPath()), ExpressionPosition.UNKNOWN),
+ toMajorType(column.getType(), getDataMode(column, footer.getFileMetaData().getSchema())));
+ fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
+ ValueVector v = TypeHelper.getNewVector(field, allocator);
+ if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+ createFixedColumnReader(fieldFixedLength, field, column, columnChunkMetaData, recordsPerBatch, v);
+ } else {
+ varLengthColumns.add(new VarLenBinaryReader.VarLengthColumn(this, -1, column, columnChunkMetaData, false, v));
+ }
+ }
+ varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
+ } catch (SchemaChangeException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ public ByteBuf getBufferWithAllData() {
+ return bufferWithAllData;
+ }
+
+ public int getRowGroupIndex() {
+ return rowGroupIndex;
+ }
+
+ public int getBitWidthAllFixedFields() {
+ return bitWidthAllFixedFields;
+ }
+
+ public long getBatchSize() {
+ return batchSize;
+ }
+
+ /**
+ * @param type a fixed length type from the parquet library enum
+ * @return the length in pageDataByteArray of the type
+ */
+ public static int getTypeLengthInBits(PrimitiveType.PrimitiveTypeName type) {
+ switch (type) {
+ case INT64: return 64;
+ case INT32: return 32;
+ case BOOLEAN: return 1;
+ case FLOAT: return 32;
+ case DOUBLE: return 64;
+ case INT96: return 96;
+ // binary and fixed length byte array
+ default:
+ throw new IllegalStateException("Length cannot be determined for type " + type);
+ }
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+ long tA = System.nanoTime(), tB;
+ System.out.println( new SimpleDateFormat("mm:ss S").format(new Date()) + " :Start of ParquetRecordReader.setup");
+ output.removeAllFields();
+
+ try {
+ for (ColumnReader crs : columnStatuses) {
+ output.addField(crs.valueVecHolder.getValueVector());
+ }
+ for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns) {
+ output.addField(r.valueVecHolder.getValueVector());
+ output.setNewSchema();
+ }
+ }catch(SchemaChangeException e) {
+ throw new ExecutionSetupException("Error setting up output mutator.", e);
+ }
+
+ // the method for reading into a ByteBuf from a stream copies all of the data into a giant buffer
+ // here we do the same thing in a loop to not initialize so much on heap
+
+ // TODO - this should be replaced by an enhancement in Hadoop 2.0 that will allow reading
+ // directly into a ByteBuf passed into the reading method
+ int totalByteLength = 0;
+ long start = 0;
+ if (rowGroupIndex == 0){
+ totalByteLength = 4;
+ }
+ else{
+ start = rowGroupOffset;
+ }
+ for (ColumnReader crs : columnStatuses){
+ totalByteLength += crs.columnChunkMetaData.getTotalSize();
+ }
+ for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns){
+ totalByteLength += r.columnChunkMetaData.getTotalSize();
+ }
+ int bufferSize = 64*1024;
+ long totalBytesWritten = 0;
+ int validBytesInCurrentBuffer;
+ byte[] buffer = new byte[bufferSize];
+ try {
+ bufferWithAllData = allocator.buffer(totalByteLength);
+ FSDataInputStream inputStream = fileSystem.open(hadoopPath);
+ inputStream.seek(start);
+ while (totalBytesWritten < totalByteLength){
+ validBytesInCurrentBuffer = (int) Math.min(bufferSize, totalByteLength - totalBytesWritten);
+ inputStream.read(buffer, 0 , validBytesInCurrentBuffer);
+ bufferWithAllData.writeBytes(buffer, 0 , (int) validBytesInCurrentBuffer);
+ totalBytesWritten += validBytesInCurrentBuffer;
+ }
+
+ } catch (IOException e) {
+ throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + hadoopPath.getName());
+ }
+ System.out.println( "Total time in method: " + ((float) (System.nanoTime() - tA) / 1e9));
+ }
+
+ private static String toFieldName(String[] paths) {
+ return join(SEPERATOR, paths);
+ }
+
+ private TypeProtos.DataMode getDataMode(ColumnDescriptor column, MessageType schema) {
+ if (schema.getColumnDescription(column.getPath()).getMaxDefinitionLevel() == 0) {
+ return TypeProtos.DataMode.REQUIRED;
+ } else {
+ return TypeProtos.DataMode.OPTIONAL;
+ }
+ }
+
+ private void resetBatch() {
+ for (ColumnReader column : columnStatuses) {
+ column.valueVecHolder.reset();
+ column.valuesReadInCurrentPass = 0;
+ }
+ for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns){
+ r.valueVecHolder.reset();
+ r.valuesReadInCurrentPass = 0;
+ }
+ }
+
+ /**
+ * @param fixedLength
+ * @param field
+ * @param descriptor
+ * @param columnChunkMetaData
+ * @param allocateSize - the size of the vector to create
+ * @return
+ * @throws SchemaChangeException
+ */
+ private boolean createFixedColumnReader(boolean fixedLength, MaterializedField field, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v)
+ throws SchemaChangeException {
+ TypeProtos.MajorType type = field.getType();
+ if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
+ columnStatuses.add(new BitReader(this, allocateSize, descriptor, columnChunkMetaData,
+ fixedLength, v));
+ }
+ else{
+ columnStatuses.add(new FixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData,
+ fixedLength, v));
+ }
+ return true;
+ }
+
+ public void readAllFixedFields(long recordsToRead, ColumnReader firstColumnStatus) throws IOException {
+
+ for (ColumnReader crs : columnStatuses){
+ crs.readAllFixedFields(recordsToRead, firstColumnStatus);
+ }
+ }
+
+ @Override
+ public int next() {
+ resetBatch();
+ long recordsToRead = 0;
+ try {
+ ColumnReader firstColumnStatus = columnStatuses.iterator().next();
+ if (allFieldsFixedLength) {
+ recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
+ } else {
+ // arbitrary
+ recordsToRead = 8000;
+
+ // going to incorporate looking at length of values and copying the data into a single loop, hopefully it won't
+ // get too complicated
+
+ //loop through variable length data to find the maximum records that will fit in this batch
+ // this will be a bit annoying if we want to loop though row groups, columns, pages and then individual variable
+ // length values...
+ // jacques believes that variable length fields will be encoded as |length|value|length|value|...
+ // cannot find more information on this right now, will keep looking
+ }
+
+ if (allFieldsFixedLength) {
+ readAllFixedFields(recordsToRead, firstColumnStatus);
+ } else { // variable length columns
+ long fixedRecordsToRead = varLengthReader.readFields(recordsToRead, firstColumnStatus);
+ readAllFixedFields(fixedRecordsToRead, firstColumnStatus);
+ }
+
+ return firstColumnStatus.valuesReadInCurrentPass;
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName,
+ TypeProtos.DataMode mode) {
+ return toMajorType(primitiveTypeName, 0, mode);
+ }
+
+ static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
+ TypeProtos.DataMode mode) {
+ switch (primitiveTypeName) {
+ case BINARY:
+ return Types.required(TypeProtos.MinorType.VARBINARY);
+ case INT64:
+ return Types.required(TypeProtos.MinorType.BIGINT);
+ case INT32:
+ return Types.required(TypeProtos.MinorType.INT);
+ case BOOLEAN:
+ return Types.required(TypeProtos.MinorType.BIT);
+ case FLOAT:
+ return Types.required(TypeProtos.MinorType.FLOAT4);
+ case DOUBLE:
+ return Types.required(TypeProtos.MinorType.FLOAT8);
+ // Both of these are not supported by the parquet library yet (7/3/13),
+ // but they are declared here for when they are implemented
+ case INT96:
+ return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
+ .setMode(mode).build();
+ case FIXED_LEN_BYTE_ARRAY:
+ checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
+ return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
+ .setWidth(length).setMode(mode).build();
+ default:
+ throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
+ }
+ }
+
+ static String join(String delimiter, String... str) {
+ StringBuilder builder = new StringBuilder();
+ int i = 0;
+ for (String s : str) {
+ builder.append(s);
+ if (i < str.length) {
+ builder.append(delimiter);
+ }
+ i++;
+ }
+ return builder.toString();
+ }
+
+ @Override
+ public void cleanup() {
+ columnStatuses.clear();
+ bufferWithAllData.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
new file mode 100644
index 0000000..1e5d203
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -0,0 +1,137 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.*;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.exec.exception.SetupException;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntryFromHDFS;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+
+import java.util.*;
+
+// Class containing information for reading a single parquet row group form HDFS
+@JsonTypeName("parquet-row-group-scan")
+public class ParquetRowGroupScan extends AbstractBase implements SubScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRowGroupScan.class);
+
+ public StorageEngineConfig engineConfig;
+ private ParquetStorageEngine parquetStorageEngine;
+ private List<RowGroupReadEntry> rowGroupReadEntries;
+
+ @JsonCreator
+ public ParquetRowGroupScan(@JacksonInject StorageEngineRegistry registry, @JsonProperty("engineConfig") StorageEngineConfig engineConfig,
+ @JsonProperty("rowGroupReadEntries") LinkedList<RowGroupReadEntry> rowGroupReadEntries) throws SetupException {
+ parquetStorageEngine = (ParquetStorageEngine) registry.getEngine(engineConfig);
+ this.rowGroupReadEntries = rowGroupReadEntries;
+ }
+
+ public ParquetRowGroupScan(ParquetStorageEngine engine, ParquetStorageEngineConfig config,
+ List<RowGroupReadEntry> rowGroupReadEntries) throws SetupException {
+ parquetStorageEngine = engine;
+ engineConfig = config;
+ this.rowGroupReadEntries = rowGroupReadEntries;
+ }
+
+ public List<RowGroupReadEntry> getRowGroupReadEntries() {
+ return rowGroupReadEntries;
+ }
+
+ public StorageEngineConfig getEngineConfig() {
+ return engineConfig;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return null;
+ }
+
+ @Override
+ public Size getSize() {
+ return null;
+ }
+
+ @Override
+ public boolean isExecutable() {
+ return false;
+ }
+
+ @JsonIgnore
+ public ParquetStorageEngine getStorageEngine(){
+ return parquetStorageEngine;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ try {
+ return new ParquetRowGroupScan(parquetStorageEngine, (ParquetStorageEngineConfig) engineConfig, rowGroupReadEntries);
+ } catch (SetupException e) {
+ // TODO - handle this
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ return null;
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Iterators.emptyIterator();
+ }
+
+ public static class RowGroupReadEntry extends ReadEntryFromHDFS {
+
+ private int rowGroupIndex;
+
+ @parquet.org.codehaus.jackson.annotate.JsonCreator
+ public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start,
+ @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
+ super(path, start, length);
+ this.rowGroupIndex = rowGroupIndex;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1, 2, 1, 1);
+ }
+
+ @Override
+ public Size getSize() {
+ // TODO - these values are wrong, I cannot know these until after I read a file
+ return new Size(10, 10);
+ }
+
+ @JsonIgnore
+ public RowGroupReadEntry getRowGroupReadEntry() {
+ return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
+ }
+
+ public int getRowGroupIndex(){
+ return rowGroupIndex;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
new file mode 100644
index 0000000..bd63406
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.MockScanBatchCreator;
+
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.ParquetMetadata;
+
+public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
+ long tA = System.nanoTime(), tB;
+ System.out.println( new SimpleDateFormat("mm:ss S").format(new Date()) + " :Start of ScanBatCreator.scanBatch");
+ Preconditions.checkArgument(children.isEmpty());
+ List<RecordReader> readers = Lists.newArrayList();
+ for(ParquetRowGroupScan.RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){
+ /*
+ Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
+ TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine)
+ we should add more information to the RowGroupInfo that will be populated upon the first read to
+ provide the reader with all of th file meta-data it needs
+ These fields will be added to the constructor below
+ */
+ try {
+ readers.add(
+ new ParquetRecordReader(
+ context, e.getPath(), e.getRowGroupIndex(), rowGroupScan.getStorageEngine().getFileSystem(),
+ rowGroupScan.getStorageEngine().getCodecFactoryExposer(),
+ ParquetFileReader.readFooter( rowGroupScan.getStorageEngine().getFileSystem().getConf(), new Path(e.getPath()))
+ )
+ );
+ } catch (IOException e1) {
+ throw new ExecutionSetupException(e1);
+ }
+ }
+ System.out.println( "Total time in method: " + ((float) (System.nanoTime() - tA) / 1e9));
+ return new ScanBatch(context, readers.iterator());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
new file mode 100644
index 0000000..f070f0f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.ReadEntryWithPath;
+import org.apache.drill.exec.physical.config.MockStorageEngine;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStorageEngine;
+import org.apache.drill.exec.store.RecordReader;
+
+import com.google.common.collect.ListMultimap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+
+public class ParquetStorageEngine extends AbstractStorageEngine{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+
+ private final DrillbitContext context;
+ private final ParquetStorageEngineConfig configuration;
+ private FileSystem fs;
+ private Configuration conf;
+ static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+ private CodecFactoryExposer codecFactoryExposer;
+ final ParquetMetadata footer;
+ public static final String HADOOP_DEFAULT_NAME = "fs.default.name";
+
+
+ public ParquetStorageEngine(ParquetStorageEngineConfig configuration, DrillbitContext context){
+ this.context = context;
+ this.configuration = configuration;
+ this.footer = null;
+ try {
+ this.conf = new Configuration();
+ this.conf.set(HADOOP_DEFAULT_NAME, configuration.getDfsName());
+ this.fs = FileSystem.get(conf);
+ codecFactoryExposer = new CodecFactoryExposer(conf);
+ } catch (IOException ie) {
+ throw new RuntimeException("Error setting up filesystem");
+ }
+ }
+
+ public Configuration getHadoopConfig() {
+ return this.conf;
+ }
+
+ public FileSystem getFileSystem() {
+ return this.fs;
+ }
+
+ public DrillbitContext getContext() {
+ return this.context;
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public ParquetGroupScan getPhysicalScan(Scan scan) throws IOException {
+
+ ArrayList<ReadEntryWithPath> readEntries = scan.getSelection().getListWith(new ObjectMapper(),
+ new TypeReference<ArrayList<ReadEntryWithPath>>() {});
+
+ return new ParquetGroupScan(readEntries, this);
+ }
+
+ @Override
+ public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
+ return null;
+ }
+
+ @Override
+ public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException {
+ return null;
+ }
+
+
+ public CodecFactoryExposer getCodecFactoryExposer() {
+ return codecFactoryExposer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java
new file mode 100644
index 0000000..ad55f13
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StorageEngineConfigBase;
+
+import java.util.HashMap;
+
+@JsonTypeName("parquet")
+public class ParquetStorageEngineConfig extends StorageEngineConfigBase {
+
+ public String getDfsName() {
+ return dfsName;
+ }
+
+ // information needed to identify an HDFS instance
+ private String dfsName;
+ private HashMap<String,String> map;
+
+ @JsonCreator
+ public ParquetStorageEngineConfig(@JsonProperty("dfsName") String dfsName) {
+ this.dfsName = dfsName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ParquetStorageEngineConfig that = (ParquetStorageEngineConfig) o;
+
+ if (dfsName != null ? !dfsName.equals(that.dfsName) : that.dfsName != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return dfsName != null ? dfsName.hashCode() : 0;
+ }
+ public void set(String key, String value) {
+ map.put(key, value);
+ }
+
+ public String get(String key) {
+ return map.get(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
new file mode 100644
index 0000000..08e1023
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
@@ -0,0 +1,130 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import parquet.bytes.BytesUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.io.IOException;
+import java.util.List;
+
+public class VarLenBinaryReader {
+
+ ParquetRecordReader parentReader;
+ final List<VarLengthColumn> columns;
+
+ public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn> columns){
+ this.parentReader = parentReader;
+ this.columns = columns;
+ }
+
+ public static class VarLengthColumn extends ColumnReader {
+
+ VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+ }
+
+ @Override
+ protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Reads as many variable length values as possible.
+ *
+ * @param recordsToReadInThisPass - the number of records recommended for reading form the reader
+ * @param firstColumnStatus - a reference to the first column status in the parquet file to grab metatdata from
+ * @return - the number of fixed length fields that will fit in the batch
+ * @throws IOException
+ */
+ public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
+
+ long recordsReadInCurrentPass = 0;
+ int lengthVarFieldsInCurrentRecord;
+ boolean rowGroupFinished = false;
+ byte[] bytes;
+ VarBinaryVector currVec;
+ // write the first 0 offset
+ for (ColumnReader columnReader : columns) {
+ if (columnReader.isFixedLength) {
+ continue;
+ }
+ currVec = (VarBinaryVector) columnReader.valueVecHolder.getValueVector();
+ currVec.getAccessor().getOffsetVector().getData().writeInt(0);
+ columnReader.bytesReadInCurrentPass = 0;
+ columnReader.valuesReadInCurrentPass = 0;
+ }
+ do {
+ lengthVarFieldsInCurrentRecord = 0;
+ for (ColumnReader columnReader : columns) {
+ if (columnReader.isFixedLength) {
+ continue;
+ }
+ if (columnReader.pageReadStatus.currentPage == null
+ || columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
+ columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead;
+ if (!columnReader.pageReadStatus.next()) {
+ rowGroupFinished = true;
+ break;
+ }
+ }
+ bytes = columnReader.pageReadStatus.pageDataByteArray;
+
+ // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
+ columnReader.dataTypeLengthInBits = BytesUtils.readIntLittleEndian(bytes,
+ (int) columnReader.pageReadStatus.readPosInBytes);
+ lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
+
+ }
+ // check that the next record will fit in the batch
+ if (rowGroupFinished || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + lengthVarFieldsInCurrentRecord * 8
+ > parentReader.getBatchSize()){
+ break;
+ }
+ else{
+ recordsReadInCurrentPass++;
+ }
+ for (ColumnReader columnReader : columns) {
+ if (columnReader.isFixedLength) {
+ continue;
+ }
+ bytes = columnReader.pageReadStatus.pageDataByteArray;
+ currVec = (VarBinaryVector) columnReader.valueVecHolder.getValueVector();
+ // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
+ currVec.getAccessor().getOffsetVector().getData().writeInt((int) columnReader.bytesReadInCurrentPass +
+ columnReader.dataTypeLengthInBits - 4 * (int) columnReader.valuesReadInCurrentPass);
+ currVec.getData().writeBytes(bytes, (int) columnReader.pageReadStatus.readPosInBytes + 4,
+ columnReader.dataTypeLengthInBits);
+ columnReader.pageReadStatus.readPosInBytes += columnReader.dataTypeLengthInBits + 4;
+ columnReader.bytesReadInCurrentPass += columnReader.dataTypeLengthInBits + 4;
+ columnReader.pageReadStatus.valuesRead++;
+ columnReader.valuesReadInCurrentPass++;
+ currVec.getMutator().setValueCount((int)recordsReadInCurrentPass);
+ // reached the end of a page
+ if ( columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
+ columnReader.pageReadStatus.next();
+ }
+ }
+ } while (recordsReadInCurrentPass < recordsToReadInThisPass);
+ return recordsReadInCurrentPass;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index 4bfab47..54a6cb8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -7,7 +7,7 @@ import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.record.DeadBuf;
import org.apache.drill.exec.record.MaterializedField;
-abstract class BaseDataValueVector extends BaseValueVector{
+public abstract class BaseDataValueVector extends BaseValueVector{
protected ByteBuf data = DeadBuf.DEAD_BUFFER;
protected int valueCount;
@@ -47,6 +47,10 @@ abstract class BaseDataValueVector extends BaseValueVector{
public FieldMetadata getMetadata() {
return null;
}
+
+ public ByteBuf getData(){
+ return data;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
index 9fd33b9..2c86406 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
@@ -91,9 +91,7 @@ public abstract class AbstractFragmentRunnerListener implements FragmentRunnerLi
}
protected abstract void statusChange(FragmentHandle handle, FragmentStatus status);
-
-
-
+
@Override
public final void fail(FragmentHandle handle, String message, Throwable excep) {
FragmentStatus.Builder status = getBuilder(FragmentState.FAILED);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
new file mode 100644
index 0000000..93f1af7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.rpc.bit.BitTunnel;
+
+/**
+ * Informs remote node as fragment changes state.
+ */
+public class RemoteFragmentRunnerListener extends AbstractFragmentRunnerListener{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteFragmentRunnerListener.class);
+
+ private final BitTunnel tunnel;
+
+ public RemoteFragmentRunnerListener(FragmentContext context, BitTunnel tunnel) {
+ super(context);
+ this.tunnel = tunnel;
+ }
+
+
+ @Override
+ protected void statusChange(FragmentHandle handle, FragmentStatus status) {
+ logger.debug("Sending remote failure.");
+ tunnel.sendFragmentStatus(status);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
deleted file mode 100644
index ef7bcb1..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.work;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.Builder;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
-import org.apache.drill.exec.rpc.bit.BitTunnel;
-import org.apache.drill.exec.work.foreman.ErrorHelper;
-
-/**
- * Informs remote node as fragment changes state.
- */
-public class RemotingFragmentRunnerListener extends AbstractFragmentRunnerListener{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemotingFragmentRunnerListener.class);
-
- private final BitTunnel tunnel;
-
- public RemotingFragmentRunnerListener(FragmentContext context, BitTunnel tunnel) {
- super(context);
- this.tunnel = tunnel;
- }
-
-
- @Override
- protected void statusChange(FragmentHandle handle, FragmentStatus status) {
- logger.debug("Sending remote failure.");
- tunnel.sendFragmentStatus(status);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
index 2829dfd..b6e0159 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
@@ -48,10 +48,10 @@ import org.apache.drill.exec.rpc.bit.BitConnection;
import org.apache.drill.exec.rpc.bit.BitRpcConfig;
import org.apache.drill.exec.rpc.bit.BitTunnel;
import org.apache.drill.exec.work.FragmentRunner;
-import org.apache.drill.exec.work.RemotingFragmentRunnerListener;
+import org.apache.drill.exec.work.RemoteFragmentRunnerListener;
+import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
-import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
import com.google.common.collect.Maps;
import com.google.protobuf.MessageLite;
@@ -116,7 +116,7 @@ public class BitComHandlerImpl implements BitComHandler {
logger.debug("Received remote fragment start instruction", fragment);
FragmentContext context = new FragmentContext(bee.getContext(), fragment.getHandle(), null, null,new FunctionImplementationRegistry(bee.getContext().getConfig()));
BitTunnel tunnel = bee.getContext().getBitCom().getTunnel(fragment.getForeman());
- RemotingFragmentRunnerListener listener = new RemotingFragmentRunnerListener(context, tunnel);
+ RemoteFragmentRunnerListener listener = new RemoteFragmentRunnerListener(context, tunnel);
try{
FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson());
RootExec exec = ImplCreator.getExec(context, rootOperator);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 0a4614a..c9c23b5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -166,7 +166,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
LogicalPlan logicalPlan = context.getPlanReader().readLogicalPlan(json);
logger.debug("Logical {}", logicalPlan.unparse(DrillConfig.create()));
PhysicalPlan physicalPlan = convert(logicalPlan);
- logger.debug("Physical {}", new ObjectMapper().writeValueAsString(physicalPlan));
+ //logger.debug("Physical {}", new ObjectMapper().writeValueAsString(physicalPlan));
runPhysicalPlan(physicalPlan);
} catch (IOException e) {
fail("Failure while parsing logical plan.", e);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
index e4d0cfc..d63b4f4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
@@ -36,7 +36,7 @@ import org.apache.drill.exec.rpc.bit.BitTunnel;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.FragmentRunner;
import org.apache.drill.exec.work.FragmentRunnerListener;
-import org.apache.drill.exec.work.RemotingFragmentRunnerListener;
+import org.apache.drill.exec.work.RemoteFragmentRunnerListener;
import org.apache.drill.exec.work.batch.IncomingBuffers;
/**
@@ -58,7 +58,7 @@ public class RemoteFragmentHandler implements IncomingFragmentHandler {
this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
this.buffers = new IncomingBuffers(root);
this.context = new FragmentContext(context, fragment.getHandle(), null, buffers, new FunctionImplementationRegistry(context.getConfig()));
- this.runnerListener = new RemotingFragmentRunnerListener(this.context, foremanTunnel);
+ this.runnerListener = new RemoteFragmentRunnerListener(this.context, foremanTunnel);
this.reader = context.getPlanReader();
}catch(IOException e){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java b/sandbox/prototype/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
new file mode 100644
index 0000000..72322d5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package parquet.hadoop;
+
+import com.sun.corba.se.impl.interceptors.CodecFactoryImpl;
+import org.apache.hadoop.conf.Configuration;
+import parquet.bytes.BytesInput;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.IOException;
+
+public class CodecFactoryExposer{
+
+ private CodecFactory codecFactory;
+
+ public CodecFactoryExposer(Configuration config){
+ codecFactory = new CodecFactory(config);
+ }
+
+ public CodecFactory getCodecFactory() {
+ return codecFactory;
+ }
+
+ public BytesInput decompress(BytesInput bytes, int uncompressedSize, CompressionCodecName codecName) throws IOException {
+ return codecFactory.getDecompressor(codecName).decompress(bytes, uncompressedSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
index a543197..a590420 100644
--- a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
+++ b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
@@ -8,17 +8,18 @@ drill.exec: {
cluster-id: "drillbits1"
rpc: {
user.port : 31010,
- bit.port : 31011
+ bit.port : 31011,
+ use.ip : false
},
optimizer: {
implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
},
zk: {
- connect: "localhost:2181",
+ connect: "10.10.30.52:5181",
root: "/drill",
refresh: 500,
- timeout: 1000,
+ timeout: 5000,
retry: {
count: 7200,
delay: 500
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java
index 1513c99..f6d83cc 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java
@@ -21,13 +21,17 @@ import static org.junit.Assert.*;
import java.util.List;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.pop.PopUnitTestBase;
import org.apache.drill.exec.proto.UserProtos.QueryType;
+import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.vector.ValueVector;
import org.junit.Test;
import com.google.common.base.Charsets;
@@ -37,7 +41,6 @@ import com.google.common.io.Files;
public class TestDistributedFragmentRun extends PopUnitTestBase{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestDistributedFragmentRun.class);
-
@Test
public void oneBitOneExchangeOneEntryRun() throws Exception{
RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ByteArrayUtil.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ByteArrayUtil.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ByteArrayUtil.java
new file mode 100644
index 0000000..ea1404b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ByteArrayUtil.java
@@ -0,0 +1,181 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+//TODO - make sure we figure out the license on these
+public class ByteArrayUtil {
+
+ public static byte[] toByta(Object data) throws Exception {
+ if (data instanceof Integer) return toByta((int) data);
+ else if (data instanceof Double) return toByta((double) data);
+ else if (data instanceof Float) return toByta((float) data);
+ else if (data instanceof Boolean) return toByta((boolean) data);
+ else if (data instanceof Long) return toByta((long) data);
+ else throw new Exception("Cannot convert that type to a byte array.");
+ }
+
+ // found at http://www.daniweb.com/software-development/java/code/216874/primitive-types-as-byte-arrays
+ // I have modified them to switch the endianess of integers and longs
+ /* ========================= */
+ /* "primitive type --> byte[] data" Methods */
+ /* ========================= */
+ public static byte[] toByta(byte data) {
+ return new byte[]{data};
+ }
+
+ public static byte[] toByta(byte[] data) {
+ return data;
+ }
+
+ /* ========================= */
+ public static byte[] toByta(short data) {
+ return new byte[]{
+ (byte) ((data >> 8) & 0xff),
+ (byte) ((data >> 0) & 0xff),
+ };
+ }
+
+ public static byte[] toByta(short[] data) {
+ if (data == null) return null;
+ // ----------
+ byte[] byts = new byte[data.length * 2];
+ for (int i = 0; i < data.length; i++)
+ System.arraycopy(toByta(data[i]), 0, byts, i * 2, 2);
+ return byts;
+ }
+
+ /* ========================= */
+ public static byte[] toByta(char data) {
+ return new byte[]{
+ (byte) ((data >> 8) & 0xff),
+ (byte) ((data >> 0) & 0xff),
+ };
+ }
+
+ public static byte[] toByta(char[] data) {
+ if (data == null) return null;
+ // ----------
+ byte[] byts = new byte[data.length * 2];
+ for (int i = 0; i < data.length; i++)
+ System.arraycopy(toByta(data[i]), 0, byts, i * 2, 2);
+ return byts;
+ }
+
+ /* ========================= */
+ public static byte[] toByta(int data) {
+ return new byte[]{
+ (byte) ((data >> 0) & 0xff),
+ (byte) ((data >> 8) & 0xff),
+ (byte) ((data >> 16) & 0xff),
+ (byte) ((data >> 24) & 0xff),
+ };
+ }
+
+ public static byte[] toByta(int[] data) {
+ if (data == null) return null;
+ // ----------
+ byte[] byts = new byte[data.length * 4];
+ for (int i = 0; i < data.length; i++)
+ System.arraycopy(toByta(data[i]), 0, byts, i * 4, 4);
+ return byts;
+ }
+
+ /* ========================= */
+ public static byte[] toByta(long data) {
+ return new byte[]{
+ (byte) ((data >> 0) & 0xff),
+ (byte) ((data >> 8) & 0xff),
+ (byte) ((data >> 16) & 0xff),
+ (byte) ((data >> 24) & 0xff),
+ (byte) ((data >> 32) & 0xff),
+ (byte) ((data >> 40) & 0xff),
+ (byte) ((data >> 48) & 0xff),
+ (byte) ((data >> 56) & 0xff),
+ };
+ }
+
+ public static byte[] toByta(long[] data) {
+ if (data == null) return null;
+ // ----------
+ byte[] byts = new byte[data.length * 8];
+ for (int i = 0; i < data.length; i++)
+ System.arraycopy(toByta(data[i]), 0, byts, i * 8, 8);
+ return byts;
+ }
+
+ /* ========================= */
+ public static byte[] toByta(float data) {
+ return toByta(Float.floatToRawIntBits(data));
+ }
+
+ public static byte[] toByta(float[] data) {
+ if (data == null) return null;
+ // ----------
+ byte[] byts = new byte[data.length * 4];
+ for (int i = 0; i < data.length; i++)
+ System.arraycopy(toByta(data[i]), 0, byts, i * 4, 4);
+ return byts;
+ }
+
+ /* ========================= */
+ public static byte[] toByta(double data) {
+ return toByta(Double.doubleToRawLongBits(data));
+ }
+
+ public static byte[] toByta(double[] data) {
+ if (data == null) return null;
+ // ----------
+ byte[] byts = new byte[data.length * 8];
+ for (int i = 0; i < data.length; i++)
+ System.arraycopy(toByta(data[i]), 0, byts, i * 8, 8);
+ return byts;
+ }
+
+ /* ========================= */
+ public static byte[] toByta(boolean data) {
+ return new byte[]{(byte) (data ? 0x01 : 0x00)}; // bool -> {1 byte}
+ }
+
+ public static byte[] toByta(boolean[] data) {
+ // Advanced Technique: The byte array containts information
+ // about how many boolean values are involved, so the exact
+ // array is returned when later decoded.
+ // ----------
+ if (data == null) return null;
+ // ----------
+ int len = data.length;
+ byte[] lena = toByta(len); // int conversion; length array = lena
+ byte[] byts = new byte[lena.length + (len / 8) + (len % 8 != 0 ? 1 : 0)];
+ // (Above) length-array-length + sets-of-8-booleans +? byte-for-remainder
+ System.arraycopy(lena, 0, byts, 0, lena.length);
+ // ----------
+ // (Below) algorithm by Matthew Cudmore: boolean[] -> bits -> byte[]
+ for (int i = 0, j = lena.length, k = 7; i < data.length; i++) {
+ byts[j] |= (data[i] ? 1 : 0) << k--;
+ if (k < 0) {
+ j++;
+ k = 7;
+ }
+ }
+ // ----------
+ return byts;
+ }
+
+ // above utility methods found here:
+ // http://www.daniweb.com/software-development/java/code/216874/primitive-types-as-byte-arrays
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index 6b353ae..c9d6967 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -52,6 +52,11 @@ public class JSONRecordReaderTest {
}
@Override
+ public void removeAllFields() {
+ addFields.clear();
+ }
+
+ @Override
public void setNewSchema() throws SchemaChangeException {
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java
new file mode 100644
index 0000000..f9a1ecb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java
@@ -0,0 +1,115 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.store.parquet.ParquetStorageEngine;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+import parquet.bytes.BytesInput;
+import parquet.column.ColumnDescriptor;
+import parquet.column.page.Page;
+import parquet.column.page.PageReadStore;
+import parquet.column.page.PageReader;
+import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileWriter;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.*;
+import static parquet.column.Encoding.PLAIN;
+
+
+public class MockScantTest {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StorageEngineRegistry.class);
+
+ private boolean VERBOSE_DEBUG = false;
+
+ private class ParquetResultListener implements UserResultsListener {
+
+ CountDownLatch latch = new CountDownLatch(1);
+ @Override
+ public void submissionFailed(RpcException ex) {
+ latch.countDown();
+ }
+
+ @Override
+ public void resultArrived(QueryResultBatch result) {
+ if(result.getHeader().getIsLastChunk()) latch.countDown();
+ result.getData().release(1);
+ }
+
+ public void await() throws Exception {
+ latch.await();
+ }
+ }
+
+
+ @Test
+ public void testMockScanFullEngine() throws Exception{
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+ DrillConfig config = DrillConfig.create();
+
+// try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator());){
+ try(DrillClient client = new DrillClient(config)){
+ long A = System.nanoTime();
+// bit1.run();
+ long B = System.nanoTime();
+ client.connect();
+ long C = System.nanoTime();
+ ParquetResultListener listener = new ParquetResultListener();
+ client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/mock-scan.json"), Charsets.UTF_8), listener);
+ listener.await();
+ long D = System.nanoTime();
+ System.out.println(String.format("Took %f s to run query", (float)(D-C) / 1E9));
+ }
+ }
+}