You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/11/05 00:11:29 UTC
[02/10] drill git commit: DRILL-4800: Add AsyncPageReader to pipeline
PageRead Use non tracking input stream for Parquet scans. Make choice between
async and sync reader configurable. Make various options user configurable -
choose between sync and async
DRILL-4800: Add AsyncPageReader to pipeline PageRead Use non tracking input stream for Parquet scans. Make choice between async and sync reader configurable. Make various options user configurable - choose between sync and async page reader, enable/disable fadvise Add Parquet Scan metrics to track time spent in various operations
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f9a443d8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f9a443d8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f9a443d8
Branch: refs/heads/master
Commit: f9a443d8a3d8e81b7e76f161b611003d16a53a4d
Parents: fe2334e
Author: Parth Chandra <pa...@apache.org>
Authored: Tue Sep 27 14:03:35 2016 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed Nov 2 17:57:46 2016 -0700
----------------------------------------------------------------------
.../src/resources/drill-override-example.conf | 4 +
.../org/apache/drill/exec/ExecConstants.java | 15 +
.../apache/drill/exec/ops/OperatorContext.java | 6 +
.../drill/exec/ops/OperatorContextImpl.java | 21 +-
.../drill/exec/ops/OperatorMetricRegistry.java | 2 +
.../apache/drill/exec/ops/OperatorStats.java | 16 +-
.../drill/exec/server/BootStrapContext.java | 33 +-
.../drill/exec/server/DrillbitContext.java | 6 +
.../server/options/SystemOptionManager.java | 5 +-
.../exec/store/parquet/ParquetReaderStats.java | 42 +--
.../store/parquet/ParquetScanBatchCreator.java | 14 +-
.../parquet/columnreaders/AsyncPageReader.java | 332 +++++++++++++++++++
.../parquet/columnreaders/ColumnReader.java | 36 +-
.../store/parquet/columnreaders/PageReader.java | 160 +++++----
.../columnreaders/ParquetRecordReader.java | 102 +++++-
.../columnreaders/VarLenBinaryReader.java | 6 +-
.../BufferedDirectBufInputStream.java | 51 ++-
.../src/main/resources/drill-module.conf | 4 +
.../src/main/resources/rest/profile/profile.ftl | 2 +-
19 files changed, 684 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index 52949db..4be4aa2 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -166,6 +166,10 @@ drill.exec: {
initial: 20000000
}
},
+ scan: {
+ threadpool_size: 8,
+ decode_threadpool_size: 1
+ },
debug.error_on_leak: true
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index ba6b084..a13fd71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -96,6 +96,10 @@ public interface ExecConstants {
/** Size of JDBC batch queue (in batches) above which throttling begins. */
String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
"drill.jdbc.batch_queue_throttling_threshold";
+ // Thread pool size for scan threads. Used by the Parquet scan.
+ String SCAN_THREADPOOL_SIZE = "drill.exec.scan.threadpool_size";
+ // The size of the thread pool used by a scan to decode the data. Used by Parquet
+ String SCAN_DECODE_THREADPOOL_SIZE = "drill.exec.scan.decode_threadpool_size";
/**
* Currently if a query is cancelled, but one of the fragments reports the status as FAILED instead of CANCELLED or
@@ -147,10 +151,21 @@ public interface ExecConstants {
String PARQUET_READER_INT96_AS_TIMESTAMP = "store.parquet.reader.int96_as_timestamp";
OptionValidator PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR = new BooleanValidator(PARQUET_READER_INT96_AS_TIMESTAMP, false);
+ String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async";
+ OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC, true);
+
// Use a buffering reader for parquet page reader
String PARQUET_PAGEREADER_USE_BUFFERED_READ = "store.parquet.reader.pagereader.bufferedread";
OptionValidator PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_USE_BUFFERED_READ, true);
+ // Size in MiB of the buffer the Parquet page reader will use to read from disk. Default is 8 MiB
+ String PARQUET_PAGEREADER_BUFFER_SIZE = "store.parquet.reader.pagereader.buffersize";
+ OptionValidator PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR = new LongValidator(PARQUET_PAGEREADER_BUFFER_SIZE, 4*1024*1024);
+
+ // try to use fadvise if available
+ String PARQUET_PAGEREADER_USE_FADVISE = "store.parquet.reader.pagereader.usefadvise";
+ OptionValidator PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_USE_FADVISE, false);
+
OptionValidator COMPILE_SCALAR_REPLACEMENT = new BooleanValidator("exec.compile.scalar_replacement", false);
String JSON_ALL_TEXT_MODE = "store.json.all_text_mode";
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 33fa288..92a7269 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -47,10 +47,16 @@ public abstract class OperatorContext {
public abstract ExecutorService getExecutor();
+ public abstract ExecutorService getScanExecutor();
+
+ public abstract ExecutorService getScanDecodeExecutor();
+
public abstract ExecutionControls getExecutionControls();
public abstract DrillFileSystem newFileSystem(Configuration conf) throws IOException;
+ public abstract DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException;
+
/**
* Run the callable as the given proxy user.
*
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 85f0ccb..38ddd16 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -50,6 +50,8 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
private final BufferManager manager;
private DrillFileSystem fs;
private final ExecutorService executor;
+ private final ExecutorService scanExecutor;
+ private final ExecutorService scanDecodeExecutor;
/**
* This lazily initialized executor service is used to submit a {@link Callable task} that needs a proxy user. There
@@ -70,6 +72,8 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
stats = context.getStats().newOperatorStats(def, allocator);
executionControls = context.getExecutionControls();
executor = context.getDrillbitContext().getExecutor();
+ scanExecutor = context.getDrillbitContext().getScanExecutor();
+ scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
}
public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats)
@@ -81,6 +85,8 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
this.stats = stats;
executionControls = context.getExecutionControls();
executor = context.getDrillbitContext().getExecutor();
+ scanExecutor = context.getDrillbitContext().getScanExecutor();
+ scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
}
public DrillBuf replace(DrillBuf old, int newSize) {
@@ -95,10 +101,16 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
return manager.getManagedBuffer(size);
}
- // Allow and operator to use the thread pool
+ // Allow an operator to use the thread pool
public ExecutorService getExecutor() {
return executor;
}
+ public ExecutorService getScanExecutor() {
+ return scanExecutor;
+ }
+ public ExecutorService getScanDecodeExecutor() {
+ return scanDecodeExecutor;
+ }
public ExecutionControls getExecutionControls() {
return executionControls;
@@ -179,4 +191,11 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
return fs;
}
+ @Override
+ public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException {
+ Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
+ fs = new DrillFileSystem(conf, null);
+ return fs;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
index b704bb6..0424332 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
/**
@@ -47,6 +48,7 @@ public class OperatorMetricRegistry {
register(CoreOperatorType.HASH_AGGREGATE_VALUE, HashAggTemplate.Metric.class);
register(CoreOperatorType.HASH_JOIN_VALUE, HashJoinBatch.Metric.class);
register(CoreOperatorType.EXTERNAL_SORT_VALUE, ExternalSortBatch.Metric.class);
+ register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, ParquetRecordReader.Metric.class);
}
private static void register(final int operatorType, final Class<? extends MetricDef> metricDef) {
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index 271f734..b565774 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -131,7 +131,7 @@ public class OperatorStats {
/**
* Clear stats
*/
- public void clear() {
+ public synchronized void clear() {
processingNanos = 0l;
setupNanos = 0l;
waitNanos = 0l;
@@ -139,47 +139,47 @@ public class OperatorStats {
doubleMetrics.clear();
}
- public void startSetup() {
+ public synchronized void startSetup() {
assert !inSetup : assertionError("starting setup");
stopProcessing();
inSetup = true;
setupMark = System.nanoTime();
}
- public void stopSetup() {
+ public synchronized void stopSetup() {
assert inSetup : assertionError("stopping setup");
startProcessing();
setupNanos += System.nanoTime() - setupMark;
inSetup = false;
}
- public void startProcessing() {
+ public synchronized void startProcessing() {
assert !inProcessing : assertionError("starting processing");
processingMark = System.nanoTime();
inProcessing = true;
}
- public void stopProcessing() {
+ public synchronized void stopProcessing() {
assert inProcessing : assertionError("stopping processing");
processingNanos += System.nanoTime() - processingMark;
inProcessing = false;
}
- public void startWait() {
+ public synchronized void startWait() {
assert !inWait : assertionError("starting waiting");
stopProcessing();
inWait = true;
waitMark = System.nanoTime();
}
- public void stopWait() {
+ public synchronized void stopWait() {
assert inWait : assertionError("stopping waiting");
startProcessing();
waitNanos += System.nanoTime() - waitMark;
inWait = false;
}
- public void batchReceived(int inputIndex, long records, boolean newSchema) {
+ public synchronized void batchReceived(int inputIndex, long records, boolean newSchema) {
recordsReceivedByInput[inputIndex] += records;
batchesReceivedByInput[inputIndex]++;
if(newSchema){
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index 6554e33..adb6323 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -20,11 +20,11 @@ package org.apache.drill.exec.server;
import com.codahale.metrics.MetricRegistry;
import io.netty.channel.EventLoopGroup;
+import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.SynchronousQueue;
import org.apache.drill.common.DrillAutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.scanner.persistence.ScanResult;
@@ -37,6 +37,7 @@ import org.apache.drill.exec.rpc.TransportCheck;
public class BootStrapContext implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BootStrapContext.class);
+ private static final int MIN_SCAN_THREADPOOL_SIZE = 8; // Magic num
private final DrillConfig config;
private final EventLoopGroup loop;
@@ -45,12 +46,15 @@ public class BootStrapContext implements AutoCloseable {
private final BufferAllocator allocator;
private final ScanResult classpathScan;
private final ExecutorService executor;
+ private final ExecutorService scanExecutor;
+ private final ExecutorService scanDecodeExecutor;
public BootStrapContext(DrillConfig config, ScanResult classpathScan) {
this.config = config;
this.classpathScan = classpathScan;
this.loop = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitServer-");
- this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitClient-");
+ this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS),
+ "BitClient-");
// Note that metrics are stored in a static instance
this.metrics = DrillMetrics.getRegistry();
this.allocator = RootAllocatorFactory.newRoot(config);
@@ -65,12 +69,35 @@ public class BootStrapContext implements AutoCloseable {
super.afterExecute(r, t);
}
};
+ // Setup two threadpools one for reading raw data from disk and another for decoding the data
+ // A good guideline is to have the number threads in the scan pool to be a multiple (fractional
+ // numbers are ok) of the number of disks.
+ // A good guideline is to have the number threads in the decode pool to be a small multiple (fractional
+ // numbers are ok) of the number of cores.
+ final int numCores = Runtime.getRuntime().availableProcessors();
+ final int numScanThreads = (int) (config.getDouble(ExecConstants.SCAN_THREADPOOL_SIZE));
+ final int numScanDecodeThreads = (int) config.getDouble(ExecConstants.SCAN_DECODE_THREADPOOL_SIZE);
+ final int scanThreadPoolSize =
+ MIN_SCAN_THREADPOOL_SIZE > numScanThreads ? MIN_SCAN_THREADPOOL_SIZE : numScanThreads;
+ final int scanDecodeThreadPoolSize = numCores > numScanDecodeThreads ? numCores : numScanDecodeThreads;
+
+ this.scanExecutor = Executors.newFixedThreadPool(scanThreadPoolSize, new NamedThreadFactory("scan-"));
+ this.scanDecodeExecutor =
+ Executors.newFixedThreadPool(scanDecodeThreadPoolSize, new NamedThreadFactory("scan-decode-"));
}
public ExecutorService getExecutor() {
return executor;
}
+ public ExecutorService getScanExecutor() {
+ return scanExecutor;
+ }
+
+ public ExecutorService getScanDecodeExecutor() {
+ return scanDecodeExecutor;
+ }
+
public DrillConfig getConfig() {
return config;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 3eb87ea..ffe6c28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -172,6 +172,12 @@ public class DrillbitContext implements AutoCloseable {
public ExecutorService getExecutor() {
return context.getExecutor();
}
+ public ExecutorService getScanExecutor() {
+ return context.getScanExecutor();
+ }
+ public ExecutorService getScanDecodeExecutor() {
+ return context.getScanDecodeExecutor();
+ }
public LogicalPlanPersistence getLpPersistence() {
return lpPersistence;
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 8b67fdb..1981d24 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -99,8 +99,11 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR,
ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR,
ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR,
- ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR,
ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
+ ExecConstants.PARQUET_PAGEREADER_ASYNC_VALIDATOR,
+ ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR,
+ ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR,
+ ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR,
ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR,
ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR,
ExecConstants.ENABLE_UNION_TYPE,
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
index e95b0c8..c2711cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
@@ -17,28 +17,30 @@
*/
package org.apache.drill.exec.store.parquet;
+import java.util.concurrent.atomic.AtomicLong;
+
public class ParquetReaderStats {
- public long numDictPageHeaders;
- public long numPageHeaders;
- public long numDictPageLoads;
- public long numPageLoads;
- public long numDictPagesDecompressed;
- public long numPagesDecompressed;
-
- public long totalDictPageHeaderBytes;
- public long totalPageHeaderBytes;
- public long totalDictPageReadBytes;
- public long totalPageReadBytes;
- public long totalDictDecompressedBytes;
- public long totalDecompressedBytes;
-
- public long timeDictPageHeaders;
- public long timePageHeaders;
- public long timeDictPageLoads;
- public long timePageLoads;
- public long timeDictPagesDecompressed;
- public long timePagesDecompressed;
+ public AtomicLong numDictPageLoads = new AtomicLong();
+ public AtomicLong numDataPageLoads = new AtomicLong();
+ public AtomicLong numDataPagesDecoded = new AtomicLong();
+ public AtomicLong numDictPagesDecompressed = new AtomicLong();
+ public AtomicLong numDataPagesDecompressed = new AtomicLong();
+
+ public AtomicLong totalDictPageReadBytes = new AtomicLong();
+ public AtomicLong totalDataPageReadBytes = new AtomicLong();
+ public AtomicLong totalDictDecompressedBytes = new AtomicLong();
+ public AtomicLong totalDataDecompressedBytes = new AtomicLong();
+
+ public AtomicLong timeDictPageLoads = new AtomicLong();
+ public AtomicLong timeDataPageLoads = new AtomicLong();
+ public AtomicLong timeDataPageDecode = new AtomicLong();
+ public AtomicLong timeDictPageDecode = new AtomicLong();
+ public AtomicLong timeDictPagesDecompressed = new AtomicLong();
+ public AtomicLong timeDataPagesDecompressed = new AtomicLong();
+
+ public AtomicLong timeDiskScanWait = new AtomicLong();
+ public AtomicLong timeDiskScan = new AtomicLong();
public ParquetReaderStats() {
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index a98c660..a14bab5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -73,10 +73,18 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
DrillFileSystem fs;
try {
- fs = oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf());
- } catch(IOException e) {
- throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
+ boolean useAsyncPageReader =
+ context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
+ if (useAsyncPageReader) {
+ fs = oContext.newNonTrackingFileSystem(rowGroupScan.getStorageEngine().getFsConf());
+ } else {
+ fs = oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf());
+ }
+ } catch (IOException e) {
+ throw new ExecutionSetupException(
+ String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
}
+
Configuration conf = new Configuration(fs.getConf());
conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
new file mode 100644
index 0000000..3f47f04
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.drill.exec.util.filereader.DirectBufInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.parquet.column.Encoding.valueOf;
+
+class AsyncPageReader extends PageReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+
+
+ private ExecutorService threadPool;
+ private Future<ReadStatus> asyncPageRead;
+
+ AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
+ ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
+ super(parentStatus, fs, path, columnChunkMetaData);
+ if (threadPool == null) {
+ threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+ }
+ asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+ }
+
+ @Override protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
+ final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException {
+ if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+ try {
+ dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos());
+ } catch (IOException e) {
+ handleAndThrowException(e, "Error Reading dictionary page.");
+ }
+ // parent constructor may call this method before the thread pool is set.
+ if (threadPool == null) {
+ threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+ }
+ asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+ readDictionaryPage(asyncPageRead, parentStatus);
+ asyncPageRead = null; // reset after consuming
+ }
+ }
+
+ private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
+ DrillBuf data;
+ boolean isDictionary = false;
+ synchronized (this) {
+ data = readStatus.getPageData();
+ readStatus.setPageData(null);
+ isDictionary = readStatus.isDictionaryPage;
+ }
+ if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) {
+ DrillBuf uncompressedData = data;
+ data = decompress(readStatus.getPageHeader(), uncompressedData);
+ synchronized (this) {
+ readStatus.setPageData(null);
+ }
+ uncompressedData.release();
+ } else {
+ if (isDictionary) {
+ stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
+ } else {
+ stats.totalDataPageReadBytes.addAndGet(readStatus.bytesRead);
+ }
+ }
+ return data;
+ }
+
+ // Read and decode the dictionary and the header
+ private void readDictionaryPage(final Future<ReadStatus> asyncPageRead,
+ final ColumnReader<?> parentStatus) throws UserException {
+ try {
+ Stopwatch timer = Stopwatch.createStarted();
+ ReadStatus readStatus = asyncPageRead.get();
+ long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
+ stats.timeDiskScanWait.addAndGet(timeBlocked);
+ stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
+ stats.numDictPageLoads.incrementAndGet();
+ stats.timeDictPageLoads.addAndGet(timeBlocked+readStatus.getDiskScanTime());
+ readDictionaryPageData(readStatus, parentStatus);
+ } catch (Exception e) {
+ handleAndThrowException(e, "Error reading dictionary page.");
+ }
+ }
+
+ // Read and decode the dictionary data
+ private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader<?> parentStatus)
+ throws UserException {
+ try {
+ pageHeader = readStatus.getPageHeader();
+ int uncompressedSize = pageHeader.getUncompressed_page_size();
+ final DrillBuf dictionaryData = getDecompressedPageData(readStatus);
+ Stopwatch timer = Stopwatch.createStarted();
+ allocatedDictionaryBuffers.add(dictionaryData);
+ DictionaryPage page = new DictionaryPage(asBytesInput(dictionaryData, 0, uncompressedSize),
+ pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values,
+ valueOf(pageHeader.dictionary_page_header.encoding.name()));
+ this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
+ long timeToDecode = timer.elapsed(TimeUnit.NANOSECONDS);
+ stats.timeDictPageDecode.addAndGet(timeToDecode);
+ } catch (Exception e) {
+ handleAndThrowException(e, "Error decoding dictionary page.");
+ }
+ }
+
+ private void handleAndThrowException(Exception e, String msg) throws UserException {
+ UserException ex = UserException.dataReadError(e).message(msg)
+ .pushContext("Row Group Start: ", this.parentColumnReader.columnChunkMetaData.getStartingPos())
+ .pushContext("Column: ", this.parentColumnReader.schemaElement.getName())
+ .pushContext("File: ", this.fileName).build(logger);
+ throw ex;
+ }
+
+ private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) {
+ DrillBuf pageDataBuf = null;
+ Stopwatch timer = Stopwatch.createUnstarted();
+ long timeToRead;
+ int compressedSize = pageHeader.getCompressed_page_size();
+ int uncompressedSize = pageHeader.getUncompressed_page_size();
+ pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
+ try {
+ timer.start();
+ codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec())
+ .decompress(compressedData.nioBuffer(0, compressedSize), compressedSize,
+ pageDataBuf.nioBuffer(0, uncompressedSize), uncompressedSize);
+ timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+ this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize);
+ } catch (IOException e) {
+ handleAndThrowException(e, "Error decompressing data.");
+ }
+ return pageDataBuf;
+ }
+
+ @Override protected void nextInternal() throws IOException {
+ ReadStatus readStatus = null;
+ try {
+ Stopwatch timer = Stopwatch.createStarted();
+ readStatus = asyncPageRead.get();
+ long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
+ stats.timeDiskScanWait.addAndGet(timeBlocked);
+ stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
+ if (readStatus.isDictionaryPage) {
+ stats.numDictPageLoads.incrementAndGet();
+ stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+ } else {
+ stats.numDataPageLoads.incrementAndGet();
+ stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+ }
+ pageHeader = readStatus.getPageHeader();
+ // reset this. At the time of calling close, if this is not null then a pending asyncPageRead needs to be consumed
+ asyncPageRead = null;
+ } catch (Exception e) {
+ handleAndThrowException(e, "Error reading page data.");
+ }
+
+ // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
+ // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
+
+ do {
+ if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
+ readDictionaryPageData(readStatus, parentColumnReader);
+ // Ugly. Use the Async task to make a synchronous read call.
+ readStatus = new AsyncPageReaderTask().call();
+ pageHeader = readStatus.getPageHeader();
+ }
+ } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
+
+ if (dataReader.hasRemainder() && parentColumnReader.totalValuesRead + readStatus.getValuesRead()
+ < parentColumnReader.columnChunkMetaData.getValueCount()) {
+ asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+ }
+
+ pageHeader = readStatus.getPageHeader();
+ pageData = getDecompressedPageData(readStatus);
+
+ }
+
+
+ @Override public void clear() {
+ if (asyncPageRead != null) {
+ asyncPageRead.cancel(true);
+ try {
+ ReadStatus r = asyncPageRead.get();
+ r.getPageData().release();
+ } catch (Exception e) {
+ // Do nothing.
+ }
+ }
+ super.clear();
+ }
+
+ public static class ReadStatus {
+ private PageHeader pageHeader;
+ private DrillBuf pageData;
+ private boolean isDictionaryPage = false;
+ private long bytesRead = 0;
+ private long valuesRead = 0;
+ private long diskScanTime = 0;
+
+ public synchronized PageHeader getPageHeader() {
+ return pageHeader;
+ }
+
+ public synchronized void setPageHeader(PageHeader pageHeader) {
+ this.pageHeader = pageHeader;
+ }
+
+ public synchronized DrillBuf getPageData() {
+ return pageData;
+ }
+
+ public synchronized void setPageData(DrillBuf pageData) {
+ this.pageData = pageData;
+ }
+
+ public synchronized boolean isDictionaryPage() {
+ return isDictionaryPage;
+ }
+
+ public synchronized void setIsDictionaryPage(boolean isDictionaryPage) {
+ this.isDictionaryPage = isDictionaryPage;
+ }
+
+ public synchronized long getBytesRead() {
+ return bytesRead;
+ }
+
+ public synchronized void setBytesRead(long bytesRead) {
+ this.bytesRead = bytesRead;
+ }
+
+ public synchronized long getValuesRead() {
+ return valuesRead;
+ }
+
+ public synchronized void setValuesRead(long valuesRead) {
+ this.valuesRead = valuesRead;
+ }
+
+ public long getDiskScanTime() {
+ return diskScanTime;
+ }
+
+ public void setDiskScanTime(long diskScanTime) {
+ this.diskScanTime = diskScanTime;
+ }
+ }
+
+
+ private class AsyncPageReaderTask implements Callable<ReadStatus> {
+
+ private final AsyncPageReader parent = AsyncPageReader.this;
+
+ public AsyncPageReaderTask() {
+ }
+
+ @Override public ReadStatus call() throws IOException {
+ ReadStatus readStatus = new ReadStatus();
+
+ String oldname = Thread.currentThread().getName();
+ Thread.currentThread().setName(parent.parentColumnReader.columnChunkMetaData.toString());
+
+ long bytesRead = 0;
+ long valuesRead = 0;
+ Stopwatch timer = Stopwatch.createStarted();
+
+ DrillBuf pageData = null;
+ try {
+ PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
+ int compressedSize = pageHeader.getCompressed_page_size();
+ pageData = parent.dataReader.getNext(compressedSize);
+ bytesRead = compressedSize;
+ synchronized (parent) {
+ if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
+ readStatus.setIsDictionaryPage(true);
+ valuesRead += pageHeader.getDictionary_page_header().getNum_values();
+ } else {
+ valuesRead += pageHeader.getData_page_header().getNum_values();
+ }
+ long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
+ readStatus.setPageHeader(pageHeader);
+ readStatus.setPageData(pageData);
+ readStatus.setBytesRead(bytesRead);
+ readStatus.setValuesRead(valuesRead);
+ readStatus.setDiskScanTime(timeToRead);
+ }
+
+ } catch (Exception e) {
+ if (pageData != null) {
+ pageData.release();
+ }
+ throw e;
+ }
+ Thread.currentThread().setName(oldname);
+ return readStatus;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index f62f424..6572c78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -22,6 +22,8 @@ import io.netty.buffer.DrillBuf;
import java.io.IOException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.ValueVector;
@@ -70,7 +72,7 @@ public abstract class ColumnReader<V extends ValueVector> {
protected DrillBuf vectorData;
// when reading definition levels for nullable columns, it is a one-way stream of integers
// when reading var length data, where we don't know if all of the records will fit until we've read all of them
- // we must store the last definition level an use it in at the start of the next batch
+ // we must store the last definition level and use it at the start of the next batch
int currDefLevel;
// variables for a single read pass
@@ -84,7 +86,17 @@ public abstract class ColumnReader<V extends ValueVector> {
this.isFixedLength = fixedLength;
this.schemaElement = schemaElement;
this.valueVec = v;
- this.pageReader = new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(), columnChunkMetaData);
+ boolean useAsyncPageReader = parentReader.getFragmentContext().getOptions()
+ .getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
+ if (useAsyncPageReader) {
+ this.pageReader =
+ new AsyncPageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(),
+ columnChunkMetaData);
+ } else {
+ this.pageReader =
+ new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(),
+ columnChunkMetaData);
+ }
if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
@@ -117,11 +129,23 @@ public abstract class ColumnReader<V extends ValueVector> {
}
public void readValues(long recordsToRead) {
- readField(recordsToRead);
+ try {
+ readField(recordsToRead);
+
+ valuesReadInCurrentPass += recordsReadInThisIteration;
+ pageReader.valuesRead += recordsReadInThisIteration;
+ pageReader.readPosInBytes = readStartInBytes + readLength;
+ } catch (Exception e) {
+ UserException ex = UserException.dataReadError(e)
+ .message("Error reading from Parquet file")
+ .pushContext("Row Group Start: ", this.columnChunkMetaData.getStartingPos())
+ .pushContext("Column: ", this.schemaElement.getName())
+ .pushContext("File: ", this.parentReader.getHadoopPath().toString() )
+ .build(logger);
+ throw ex;
+
+ }
- valuesReadInCurrentPass += recordsReadInThisIteration;
- pageReader.valuesRead += recordsReadInThisIteration;
- pageReader.readPosInBytes = readStartInBytes + readLength;
}
protected abstract void readField(long recordsToRead);
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 078e4ce..c34ebd1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
-import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream;
@@ -36,7 +35,6 @@ import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.ValuesType;
import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
import org.apache.parquet.format.PageHeader;
@@ -58,25 +56,23 @@ import static org.apache.parquet.column.Encoding.valueOf;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
// class to keep track of the read position of variable length columns
-final class PageReader {
+class PageReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(
org.apache.drill.exec.store.parquet.columnreaders.PageReader.class);
public static final ParquetMetadataConverter METADATA_CONVERTER = ParquetFormatPlugin.parquetMetadataConverter;
- private final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentColumnReader;
+ protected final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentColumnReader;
//private final ColumnDataReader dataReader;
- private final DirectBufInputStream dataReader;
+ protected final DirectBufInputStream dataReader;
//der; buffer to store bytes of current page
- DrillBuf pageData;
+ protected DrillBuf pageData;
// for variable length data we need to keep track of our current position in the page data
// as the values and lengths are intermixed, making random access to the length data impossible
long readyToReadPosInBytes;
// read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData
long readPosInBytes;
- // bit shift needed for the next page if the last one did not line up with a byte boundary
- int bitShift;
// storage space for extra bits at the end of a page if they did not line up with a byte boundary
// prevents the need to keep the entire last page, as these pageDataByteArray need to be added to the next batch
//byte extraBits;
@@ -103,14 +99,18 @@ final class PageReader {
int currentPageCount = -1;
- private FSDataInputStream inputStream;
+ protected FSDataInputStream inputStream;
// These need to be held throughout reading of the entire column chunk
List<ByteBuf> allocatedDictionaryBuffers;
- private final CodecFactory codecFactory;
+ protected final CodecFactory codecFactory;
+ protected final String fileName;
- private final ParquetReaderStats stats;
+ protected final ParquetReaderStats stats;
+ private final boolean useBufferedReader;
+ private final int scanBufferSize;
+ private final boolean useFadvise;
PageReader(org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
throws ExecutionSetupException {
@@ -118,21 +118,24 @@ final class PageReader {
allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
codecFactory = parentColumnReader.parentReader.getCodecFactory();
this.stats = parentColumnReader.parentReader.parquetReaderStats;
- long start = columnChunkMetaData.getFirstDataPageOffset();
+ this.fileName = path.toString();
try {
inputStream = fs.open(path);
BufferAllocator allocator = parentColumnReader.parentReader.getOperatorContext().getAllocator();
- //TODO: make read batch size configurable
columnChunkMetaData.getTotalUncompressedSize();
- boolean useBufferedReader = parentColumnReader.parentReader.getFragmentContext().getOptions()
+ useBufferedReader = parentColumnReader.parentReader.getFragmentContext().getOptions()
.getOption(ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ).bool_val;
+ scanBufferSize = parentColumnReader.parentReader.getFragmentContext().getOptions()
+ .getOption(ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE).num_val.intValue();
+ useFadvise = parentColumnReader.parentReader.getFragmentContext().getOptions()
+ .getOption(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE).bool_val;
if (useBufferedReader) {
this.dataReader = new BufferedDirectBufInputStream(inputStream, allocator, path.getName(),
- columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), 8 * 1024 * 1024,
- true);
+ columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), scanBufferSize,
+ useFadvise);
} else {
this.dataReader = new DirectBufInputStream(inputStream, allocator, path.getName(),
- columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), true);
+ columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), useFadvise);
}
dataReader.init();
@@ -145,7 +148,7 @@ final class PageReader {
}
- private void loadDictionaryIfExists(final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentStatus,
+ protected void loadDictionaryIfExists(final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentStatus,
final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws IOException {
Stopwatch timer = Stopwatch.createUnstarted();
if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
@@ -153,7 +156,7 @@ final class PageReader {
long start=dataReader.getPos();
timer.start();
final PageHeader pageHeader = Util.readPageHeader(f);
- long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+ long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
long pageHeaderBytes=dataReader.getPos()-start;
this.updateStats(pageHeader, "Page Header", start, timeToRead, pageHeaderBytes, pageHeaderBytes);
assert pageHeader.type == PageType.DICTIONARY_PAGE;
@@ -178,7 +181,7 @@ final class PageReader {
this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
}
- public DrillBuf readPage(PageHeader pageHeader, int compressedSize, int uncompressedSize) throws IOException {
+ private DrillBuf readPage(PageHeader pageHeader, int compressedSize, int uncompressedSize) throws IOException {
DrillBuf pageDataBuf = null;
Stopwatch timer = Stopwatch.createUnstarted();
long timeToRead;
@@ -186,7 +189,7 @@ final class PageReader {
if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED) {
timer.start();
pageDataBuf = dataReader.getNext(compressedSize);
- timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+ timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, uncompressedSize);
} else {
DrillBuf compressedData = null;
@@ -195,8 +198,7 @@ final class PageReader {
try {
timer.start();
compressedData = dataReader.getNext(compressedSize);
- // dataReader.loadPage(compressedData, compressedSize);
- timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+ timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
timer.reset();
this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, compressedSize);
start=dataReader.getPos();
@@ -204,7 +206,7 @@ final class PageReader {
codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData
.getCodec()).decompress(compressedData.nioBuffer(0, compressedSize), compressedSize,
pageDataBuf.nioBuffer(0, uncompressedSize), uncompressedSize);
- timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+ timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
this.updateStats(pageHeader, "Decompress", start, timeToRead, compressedSize, uncompressedSize);
} finally {
if(compressedData != null) {
@@ -219,25 +221,12 @@ final class PageReader {
return BytesInput.from(buf.nioBuffer(offset, length), 0, length);
}
+
/**
- * Grab the next page.
- *
- * @return - if another page was present
- * @throws IOException
+ * Get the page header and the pageData (uncompressed) for the next page
*/
- public boolean next() throws IOException {
+ protected void nextInternal() throws IOException{
Stopwatch timer = Stopwatch.createUnstarted();
- currentPageCount = -1;
- valuesRead = 0;
- valuesReadyToRead = 0;
-
- // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause
- // and submit a bug report
- if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) {
- return false;
- }
- clearBuffers();
-
// next, we need to decompress the bytes
// TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
// I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
@@ -245,7 +234,7 @@ final class PageReader {
long start=dataReader.getPos();
timer.start();
pageHeader = Util.readPageHeader(dataReader);
- long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+ long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
long pageHeaderBytes=dataReader.getPos()-start;
this.updateStats(pageHeader, "Page Header", start, timeToRead, pageHeaderBytes, pageHeaderBytes);
logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}","Page Header Read","",
@@ -264,14 +253,33 @@ final class PageReader {
int uncompressedSize = pageHeader.getUncompressed_page_size();
pageData = readPage(pageHeader, compressedSize, uncompressedSize);
- currentPageCount = pageHeader.data_page_header.num_values;
- final int uncompressedPageSize = pageHeader.uncompressed_page_size;
- final Statistics<?> stats = fromParquetStatistics(pageHeader.data_page_header.getStatistics(), parentColumnReader
- .getColumnDescriptor().getType());
+ }
+
+ /**
+ * Grab the next page.
+ *
+ * @return - if another page was present
+ * @throws IOException
+ */
+ public boolean next() throws IOException {
+ Stopwatch timer = Stopwatch.createUnstarted();
+ currentPageCount = -1;
+ valuesRead = 0;
+ valuesReadyToRead = 0;
+ // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause
+ // and submit a bug report
+ if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) {
+ return false;
+ }
+ clearBuffers();
- final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
+ nextInternal();
+ timer.start();
+ currentPageCount = pageHeader.data_page_header.num_values;
+
+ final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
final Encoding valueEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.encoding);
@@ -321,40 +329,24 @@ final class PageReader {
// fit one record at a time, such as for variable length data. Both operations must start in the same location after the
// definition and repetition level data which is stored alongside the page data itself
readyToReadPosInBytes = readPosInBytes;
+ long timeDecode = timer.elapsed(TimeUnit.NANOSECONDS);
+ stats.numDataPagesDecoded.incrementAndGet();
+ stats.timeDataPageDecode.addAndGet(timeDecode);
return true;
}
/**
- * Allocate a page data buffer. Note that only one page data buffer should be active at a time. The reader will ensure
- * that the page data is released after the reader is completed.
- */
- private void allocatePageData(int size) {
- Preconditions.checkArgument(pageData == null);
- pageData = parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
- }
-
- /**
* Allocate a buffer which the user should release immediately. The reader does not manage release of these buffers.
*/
- private DrillBuf allocateTemporaryBuffer(int size) {
+ protected DrillBuf allocateTemporaryBuffer(int size) {
return parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
}
- /**
- * Allocate and return a dictionary buffer. These are maintained for the life of the reader and then released when the
- * reader is cleared.
- */
- private DrillBuf allocateDictionaryBuffer(int size) {
- DrillBuf buf = parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
- allocatedDictionaryBuffers.add(buf);
- return buf;
- }
-
protected boolean hasPage() {
return currentPageCount != -1;
}
- private void updateStats(PageHeader pageHeader, String op, long start, long time, long bytesin, long bytesout) {
+ protected void updateStats(PageHeader pageHeader, String op, long start, long time, long bytesin, long bytesout) {
String pageType = "Data Page";
if (pageHeader.type == PageType.DICTIONARY_PAGE) {
pageType = "Dictionary Page";
@@ -362,37 +354,38 @@ final class PageReader {
logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}", op, pageType.toString(),
this.parentColumnReader.parentReader.hadoopPath,
this.parentColumnReader.columnDescriptor.toString(), start, bytesin, bytesout, time);
+
if (pageHeader.type != PageType.DICTIONARY_PAGE) {
if (bytesin == bytesout) {
- this.stats.timePageLoads += time;
- this.stats.numPageLoads++;
- this.stats.totalPageReadBytes += bytesin;
+ this.stats.timeDataPageLoads.addAndGet(time);
+ this.stats.numDataPageLoads.incrementAndGet();
+ this.stats.totalDataPageReadBytes.addAndGet(bytesin);
} else {
- this.stats.timePagesDecompressed += time;
- this.stats.numPagesDecompressed++;
- this.stats.totalDecompressedBytes += bytesin;
+ this.stats.timeDataPagesDecompressed.addAndGet(time);
+ this.stats.numDataPagesDecompressed.incrementAndGet();
+ this.stats.totalDataDecompressedBytes.addAndGet(bytesin);
}
} else {
if (bytesin == bytesout) {
- this.stats.timeDictPageLoads += time;
- this.stats.numDictPageLoads++;
- this.stats.totalDictPageReadBytes += bytesin;
+ this.stats.timeDictPageLoads.addAndGet(time);
+ this.stats.numDictPageLoads.incrementAndGet();
+ this.stats.totalDictPageReadBytes.addAndGet(bytesin);
} else {
- this.stats.timeDictPagesDecompressed += time;
- this.stats.numDictPagesDecompressed++;
- this.stats.totalDictDecompressedBytes += bytesin;
+ this.stats.timeDictPagesDecompressed.addAndGet(time);
+ this.stats.numDictPagesDecompressed.incrementAndGet();
+ this.stats.totalDictDecompressedBytes.addAndGet(bytesin);
}
}
}
- public void clearBuffers() {
+ protected void clearBuffers() {
if (pageData != null) {
pageData.release();
pageData = null;
}
}
- public void clearDictionaryBuffers() {
+ protected void clearDictionaryBuffers() {
for (ByteBuf b : allocatedDictionaryBuffers) {
b.release();
}
@@ -401,15 +394,14 @@ final class PageReader {
public void clear(){
try {
+ this.inputStream.close();
this.dataReader.close();
} catch (IOException e) {
- //TODO: Throw UserException
+ //Swallow the exception which is OK for input streams
}
// Free all memory, including fixed length types. (Data is being copied for all types not just var length types)
- //if(!this.parentColumnReader.isFixedLength) {
clearBuffers();
clearDictionaryBuffers();
- //}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 924887e..1eca00f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -35,6 +35,7 @@ import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
@@ -117,16 +118,39 @@ public class ParquetRecordReader extends AbstractRecordReader {
public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
+ public enum Metric implements MetricDef {
+ NUM_DICT_PAGE_LOADS, // Number of dictionary pages read
+ NUM_DATA_PAGE_lOADS, // Number of data pages read
+ NUM_DATA_PAGES_DECODED, // Number of data pages decoded
+ NUM_DICT_PAGES_DECOMPRESSED, // Number of dictionary pages decompressed
+ NUM_DATA_PAGES_DECOMPRESSED, // Number of data pages decompressed
+ TOTAL_DICT_PAGE_READ_BYTES, // Total bytes read from disk for dictionary pages
+ TOTAL_DATA_PAGE_READ_BYTES, // Total bytes read from disk for data pages
+ TOTAL_DICT_DECOMPRESSED_BYTES, // Total bytes decompressed for dictionary pages (same as compressed bytes on disk)
+ TOTAL_DATA_DECOMPRESSED_BYTES, // Total bytes decompressed for data pages (same as compressed bytes on disk)
+ TIME_DICT_PAGE_LOADS, // Time in nanos in reading dictionary pages from disk
+ TIME_DATA_PAGE_LOADS, // Time in nanos in reading data pages from disk
+ TIME_DATA_PAGE_DECODE, // Time in nanos in decoding data pages
+ TIME_DICT_PAGE_DECODE, // Time in nanos in decoding dictionary pages
+ TIME_DICT_PAGES_DECOMPRESSED, // Time in nanos in decompressing dictionary pages
+ TIME_DATA_PAGES_DECOMPRESSED, // Time in nanos in decompressing data pages
+ TIME_DISK_SCAN_WAIT, // Time in nanos spent in waiting for an async disk read to complete
+ TIME_DISK_SCAN; // Time in nanos spent in reading data from disk.
+
+ @Override public int metricId() {
+ return ordinal();
+ }
+ }
+
public ParquetRecordReader(FragmentContext fragmentContext,
String path,
int rowGroupIndex,
- long numRecordsToRead,
+ long numRecordsToRead,
FileSystem fs,
CodecFactory codecFactory,
ParquetMetadata footer,
- List<SchemaPath> columns,
- ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus)
- throws ExecutionSetupException {
+ List<SchemaPath> columns,
+ ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException {
this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, numRecordsToRead,
path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
}
@@ -470,6 +494,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
// No columns found in the file were selected, simply return a full batch of null records for each column requested
if (firstColumnStatus == null) {
if (mockRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount()) {
+ updateStats();
return 0;
}
recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead);
@@ -483,6 +508,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
mockRecordsRead += recordsToRead;
totalRecordsRead += recordsToRead;
numRecordsToRead -= recordsToRead;
+ updateStats();
return (int) recordsToRead;
}
@@ -514,6 +540,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
// logger.debug("So far read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass();
numRecordsToRead -= firstColumnStatus.getRecordsReadInCurrentPass();
+ updateStats();
return firstColumnStatus.getRecordsReadInCurrentPass();
} catch (Exception e) {
handleAndRaise("\nHadoop path: " + hadoopPath.toUri().getPath() +
@@ -530,7 +557,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
@Override
public void close() {
- logger.debug("Read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
+ logger.debug("Read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex,
+ hadoopPath.toUri().getPath());
// enable this for debugging when it is know that a whole file will be read
// limit kills upstream operators once it has enough records, so this assert will fail
// assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount();
@@ -552,29 +580,67 @@ public class ParquetRecordReader extends AbstractRecordReader {
varLengthReader = null;
}
+
if(parquetReaderStats != null) {
- logger.trace("ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
+ logger.trace("ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
hadoopPath,
- parquetReaderStats.numDictPageHeaders,
- parquetReaderStats.numPageHeaders,
parquetReaderStats.numDictPageLoads,
- parquetReaderStats.numPageLoads,
+ parquetReaderStats.numDataPageLoads,
+ parquetReaderStats.numDataPagesDecoded,
parquetReaderStats.numDictPagesDecompressed,
- parquetReaderStats.numPagesDecompressed,
- parquetReaderStats.totalDictPageHeaderBytes,
- parquetReaderStats.totalPageHeaderBytes,
+ parquetReaderStats.numDataPagesDecompressed,
parquetReaderStats.totalDictPageReadBytes,
- parquetReaderStats.totalPageReadBytes,
+ parquetReaderStats.totalDataPageReadBytes,
parquetReaderStats.totalDictDecompressedBytes,
- parquetReaderStats.totalDecompressedBytes,
- parquetReaderStats.timeDictPageHeaders,
- parquetReaderStats.timePageHeaders,
+ parquetReaderStats.totalDataDecompressedBytes,
parquetReaderStats.timeDictPageLoads,
- parquetReaderStats.timePageLoads,
+ parquetReaderStats.timeDataPageLoads,
+ parquetReaderStats.timeDataPageDecode,
+ parquetReaderStats.timeDictPageDecode,
parquetReaderStats.timeDictPagesDecompressed,
- parquetReaderStats.timePagesDecompressed);
+ parquetReaderStats.timeDataPagesDecompressed,
+ parquetReaderStats.timeDiskScanWait,
+ parquetReaderStats.timeDiskScan
+ );
parquetReaderStats=null;
}
+
+ }
+
+ private void updateStats(){
+
+ operatorContext.getStats().setLongStat(Metric.NUM_DICT_PAGE_LOADS,
+ parquetReaderStats.numDictPageLoads.longValue());
+ operatorContext.getStats().setLongStat(Metric.NUM_DATA_PAGE_lOADS, parquetReaderStats.numDataPageLoads.longValue());
+ operatorContext.getStats().setLongStat(Metric.NUM_DATA_PAGES_DECODED, parquetReaderStats.numDataPagesDecoded.longValue());
+ operatorContext.getStats().setLongStat(Metric.NUM_DICT_PAGES_DECOMPRESSED,
+ parquetReaderStats.numDictPagesDecompressed.longValue());
+ operatorContext.getStats().setLongStat(Metric.NUM_DATA_PAGES_DECOMPRESSED,
+ parquetReaderStats.numDataPagesDecompressed.longValue());
+ operatorContext.getStats().setLongStat(Metric.TOTAL_DICT_PAGE_READ_BYTES,
+ parquetReaderStats.totalDictPageReadBytes.longValue());
+ operatorContext.getStats().setLongStat(Metric.TOTAL_DATA_PAGE_READ_BYTES,
+ parquetReaderStats.totalDataPageReadBytes.longValue());
+ operatorContext.getStats().setLongStat(Metric.TOTAL_DICT_DECOMPRESSED_BYTES,
+ parquetReaderStats.totalDictDecompressedBytes.longValue());
+ operatorContext.getStats().setLongStat(Metric.TOTAL_DATA_DECOMPRESSED_BYTES,
+ parquetReaderStats.totalDataDecompressedBytes.longValue());
+ operatorContext.getStats().setLongStat(Metric.TIME_DICT_PAGE_LOADS,
+ parquetReaderStats.timeDictPageLoads.longValue());
+ operatorContext.getStats().setLongStat(Metric.TIME_DATA_PAGE_LOADS,
+ parquetReaderStats.timeDataPageLoads.longValue());
+ operatorContext.getStats().setLongStat(Metric.TIME_DATA_PAGE_DECODE,
+ parquetReaderStats.timeDataPageDecode.longValue());
+ operatorContext.getStats().setLongStat(Metric.TIME_DICT_PAGE_DECODE,
+ parquetReaderStats.timeDictPageDecode.longValue());
+ operatorContext.getStats().setLongStat(Metric.TIME_DICT_PAGES_DECOMPRESSED,
+ parquetReaderStats.timeDictPagesDecompressed.longValue());
+ operatorContext.getStats().setLongStat(Metric.TIME_DATA_PAGES_DECOMPRESSED,
+ parquetReaderStats.timeDataPagesDecompressed.longValue());
+ operatorContext.getStats().setLongStat(Metric.TIME_DISK_SCAN_WAIT,
+ parquetReaderStats.timeDiskScanWait.longValue());
+ operatorContext.getStats().setLongStat(Metric.TIME_DISK_SCAN, parquetReaderStats.timeDiskScan.longValue());
+
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index 6ca0205..e03d930 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -17,15 +17,17 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
+import org.apache.drill.exec.vector.ValueVector;
+
import java.io.IOException;
import java.util.List;
public class VarLenBinaryReader {
ParquetRecordReader parentReader;
- final List<VarLengthColumn<?>> columns;
+ final List<VarLengthColumn<? extends ValueVector>> columns;
- public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn<?>> columns) {
+ public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn<? extends ValueVector>> columns) {
this.parentReader = parentReader;
this.columns = columns;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
index 6aa968a..a5a6b81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
@@ -23,7 +23,6 @@ import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -73,7 +72,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
/**
* The current read position in the buffer; the index of the next
* character to be read from the <code>internalBuffer</code> array.
- * <p>
+ * <p/>
* This value is always in the range <code>[0,count]</code>.
* If <code>curPosInBuffer</code> is equal to <code>count></code> then we have read
* all the buffered data and the next read (or skip) will require more data to be read
@@ -128,8 +127,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
}
- @Override
- public void init() throws UnsupportedOperationException, IOException {
+ @Override public void init() throws UnsupportedOperationException, IOException {
super.init();
this.internalBuffer = this.allocator.buffer(this.bufSize);
this.tempBuffer = this.allocator.buffer(defaultTempBufferSize);
@@ -180,10 +178,10 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
this.curPosInStream = getInputStream().getPos();
bytesRead = nBytes;
logger.trace(
- "Stream: {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, " +
- "CurPosInStream: {}, CurPosInBuffer: {}",
- this.streamId, this.startOffset, this.totalByteSize, this.bufSize, bytesRead, this.count,
- this.curPosInStream, this.curPosInBuffer);
+ "Stream: {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, "
+ + "CurPosInStream: {}, CurPosInBuffer: {}", this.streamId, this.startOffset,
+ this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream,
+ this.curPosInBuffer);
}
}
return this.count - this.curPosInBuffer;
@@ -252,8 +250,8 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
}
/**
- Has the same contract as {@link java.io.InputStream#read(byte[], int, int)}
- Except with DrillBuf
+ * Has the same contract as {@link java.io.InputStream#read(byte[], int, int)}
+ * Except with DrillBuf
*/
public synchronized int read(DrillBuf buf, int off, int len) throws IOException {
checkInputStreamState();
@@ -296,7 +294,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
return 0;
}
DrillBuf byteBuf;
- if(len <= defaultTempBufferSize){
+ if (len <= defaultTempBufferSize) {
byteBuf = tempBuffer;
} else {
byteBuf = this.allocator.buffer(len);
@@ -318,7 +316,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
}
} while (bytesRead < len);
- if(len > defaultTempBufferSize){
+ if (len > defaultTempBufferSize) {
byteBuf.release();
}
@@ -327,12 +325,11 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
/**
- Has the same contract as {@link java.io.InputStream#skip(long)}
+ * Has the same contract as {@link java.io.InputStream#skip(long)}
* Skips upto the next n bytes.
* Skip may return with less than n bytes skipped
*/
- @Override
- public synchronized long skip(long n) throws IOException {
+ @Override public synchronized long skip(long n) throws IOException {
checkInputStreamState();
long bytesAvailable = this.count - this.curPosInBuffer;
long bytesSkipped = 0;
@@ -353,8 +350,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
}
- @Override
- public synchronized int available() throws IOException {
+ @Override public synchronized int available() throws IOException {
checkInputStreamState();
int bytesAvailable = this.count - this.curPosInBuffer;
int underlyingAvailable = getInputStream().available();
@@ -365,18 +361,15 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
return available;
}
- @Override
- public synchronized void mark(int readlimit) {
+ @Override public synchronized void mark(int readlimit) {
throw new UnsupportedOperationException("Mark/reset is not supported.");
}
- @Override
- public synchronized void reset() throws IOException {
+ @Override public synchronized void reset() throws IOException {
throw new UnsupportedOperationException("Mark/reset is not supported.");
}
- @Override
- public boolean markSupported() {
+ @Override public boolean markSupported() {
return false;
}
@@ -384,7 +377,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
Returns the current position from the beginning of the underlying input stream
*/
public long getPos() throws IOException {
- return curPosInBuffer+startOffset;
+ return curPosInBuffer + startOffset;
}
public boolean hasRemainder() throws IOException {
@@ -412,6 +405,11 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
}
}
+ /**
+ * Uncomment For testing Parquet files that are too big to use in unit tests
+ * @param args
+ */
+ /*
public static void main(String[] args) {
final DrillConfig config = DrillConfig.create();
final BufferAllocator allocator = RootAllocatorFactory.newRoot(config);
@@ -433,8 +431,8 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
long totalByteSize = columnMetadata.getTotalSize();
String streamId = fileName + ":" + columnMetadata.toString();
BufferedDirectBufInputStream reader =
- new BufferedDirectBufInputStream(inputStream, allocator, streamId, startOffset,
- totalByteSize, BUFSZ, true);
+ new BufferedDirectBufInputStream(inputStream, allocator, streamId, startOffset, totalByteSize,
+ BUFSZ, true);
reader.init();
while (true) {
try {
@@ -457,4 +455,5 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
allocator.close();
return;
}
+ */
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 60dcf15..deb31b3 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -194,6 +194,10 @@ drill.exec: {
},
debug: {
return_error_for_failure_in_cancelled_fragments: false
+ },
+ scan: {
+ threadpool_size: 8,
+ decode_threadpool_size: 1
}
udf: {
retry-attempts: 5,
http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/resources/rest/profile/profile.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/profile/profile.ftl b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
index ef971fb..c0f2d8e 100644
--- a/exec/java-exec/src/main/resources/rest/profile/profile.ftl
+++ b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
@@ -221,7 +221,7 @@
</h4>
</div>
<div id="${op.getId()}-metrics" class="panel-collapse collapse">
- <div class="panel-body">
+ <div class="panel-body" style="display:block;overflow-x:auto">
${op.getMetricsTable()}
</div>
</div>