You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/12/15 01:58:01 UTC
drill git commit: DRILL-4152: Add trace logging to Parquet reader for
performance tuning. This closes #298
Repository: drill
Updated Branches:
refs/heads/master e529df460 -> 1e45f9fa1
DRILL-4152: Add trace logging to Parquet reader for performance tuning. This closes #298
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1e45f9fa
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1e45f9fa
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1e45f9fa
Branch: refs/heads/master
Commit: 1e45f9fa19363a9a337401b7fa64e198480b694a
Parents: e529df4
Author: Parth Chandra <pa...@apache.org>
Authored: Thu Dec 3 14:32:30 2015 -0800
Committer: Parth Chandra <pa...@apache.org>
Committed: Mon Dec 14 16:56:55 2015 -0800
----------------------------------------------------------------------
.../exec/store/parquet/ParquetReaderStats.java | 48 ++++++++++
.../store/parquet/ParquetScanBatchCreator.java | 10 ++-
.../store/parquet/columnreaders/PageReader.java | 94 ++++++++++++++++----
.../columnreaders/ParquetRecordReader.java | 30 ++++++-
4 files changed, 163 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/1e45f9fa/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
new file mode 100644
index 0000000..e95b0c8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+public class ParquetReaderStats {
+
+ public long numDictPageHeaders;
+ public long numPageHeaders;
+ public long numDictPageLoads;
+ public long numPageLoads;
+ public long numDictPagesDecompressed;
+ public long numPagesDecompressed;
+
+ public long totalDictPageHeaderBytes;
+ public long totalPageHeaderBytes;
+ public long totalDictPageReadBytes;
+ public long totalPageReadBytes;
+ public long totalDictDecompressedBytes;
+ public long totalDecompressedBytes;
+
+ public long timeDictPageHeaders;
+ public long timePageHeaders;
+ public long timeDictPageLoads;
+ public long timePageLoads;
+ public long timeDictPagesDecompressed;
+ public long timePagesDecompressed;
+
+ public ParquetReaderStats() {
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/drill/blob/1e45f9fa/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index d743fe1..0cb12f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -21,9 +21,11 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.base.Stopwatch;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -122,9 +124,13 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
These fields will be added to the constructor below
*/
try {
+ Stopwatch timer = new Stopwatch();
if ( ! footers.containsKey(e.getPath())){
- footers.put(e.getPath(),
- ParquetFileReader.readFooter(conf, new Path(e.getPath())));
+ timer.start();
+ ParquetMetadata footer = ParquetFileReader.readFooter(conf, new Path(e.getPath()));
+ long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+ logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", e.getPath(), "", 0, 0, 0, timeToRead);
+ footers.put(e.getPath(), footer );
}
if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) {
readers.add(
http://git-wip-us.apache.org/repos/asf/drill/blob/1e45f9fa/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 762f91a..bcc7b33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -26,10 +26,12 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.store.parquet.ColumnDataReader;
import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
+import org.apache.drill.exec.store.parquet.ParquetReaderStats;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -52,6 +54,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.PrimitiveType;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
// class to keep track of the read position of variable length columns
final class PageReader {
@@ -98,22 +101,26 @@ final class PageReader {
int currentPageCount = -1;
+ private FSDataInputStream inputStream;
+
// These need to be held throughout reading of the entire column chunk
List<ByteBuf> allocatedDictionaryBuffers;
private final CodecFactory codecFactory;
+ private final ParquetReaderStats stats;
+
PageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
throws ExecutionSetupException{
this.parentColumnReader = parentStatus;
allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
codecFactory = parentColumnReader.parentReader.getCodecFactory();
-
+ this.stats = parentColumnReader.parentReader.parquetReaderStats;
long start = columnChunkMetaData.getFirstDataPageOffset();
try {
- FSDataInputStream f = fs.open(path);
- this.dataReader = new ColumnDataReader(f, start, columnChunkMetaData.getTotalSize());
- loadDictionaryIfExists(parentStatus, columnChunkMetaData, f);
+ inputStream = fs.open(path);
+ this.dataReader = new ColumnDataReader(inputStream, start, columnChunkMetaData.getTotalSize());
+ loadDictionaryIfExists(parentStatus, columnChunkMetaData, inputStream);
} catch (IOException e) {
throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: "
@@ -124,9 +131,15 @@ final class PageReader {
private void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
final ColumnChunkMetaData columnChunkMetaData, final FSDataInputStream f) throws IOException {
+ Stopwatch timer = new Stopwatch();
if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
f.seek(columnChunkMetaData.getDictionaryPageOffset());
+ long start=f.getPos();
+ timer.start();
final PageHeader pageHeader = Util.readPageHeader(f);
+ long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+ long pageHeaderBytes=f.getPos()-start;
+ this.updateStats(pageHeader, "Page Header", start, timeToRead, pageHeaderBytes, pageHeaderBytes);
assert pageHeader.type == PageType.DICTIONARY_PAGE;
readDictionaryPage(pageHeader, parentStatus);
}
@@ -138,7 +151,7 @@ final class PageReader {
int uncompressedSize = pageHeader.getUncompressed_page_size();
final DrillBuf dictionaryData = allocateDictionaryBuffer(uncompressedSize);
- readPage(compressedSize, uncompressedSize, dictionaryData);
+ readPage(pageHeader, compressedSize, uncompressedSize, dictionaryData);
DictionaryPage page = new DictionaryPage(
asBytesInput(dictionaryData, 0, uncompressedSize),
@@ -149,20 +162,30 @@ final class PageReader {
this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
}
- public void readPage(int compressedSize, int uncompressedSize, DrillBuf dest) throws IOException {
+ public void readPage(PageHeader pageHeader, int compressedSize, int uncompressedSize, DrillBuf dest) throws IOException {
+ Stopwatch timer = new Stopwatch();
+ long timeToRead;
+ long start=inputStream.getPos();
if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED) {
+ timer.start();
dataReader.loadPage(dest, compressedSize);
+ timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+ this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, uncompressedSize);
} else {
final DrillBuf compressedData = allocateTemporaryBuffer(compressedSize);
try {
- dataReader.loadPage(compressedData, compressedSize);
- codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData
- .getCodec()).decompress(
- compressedData.nioBuffer(0, compressedSize),
- compressedSize,
- dest.nioBuffer(0, uncompressedSize),
- uncompressedSize);
-
+ timer.start();
+ dataReader.loadPage(compressedData, compressedSize);
+ timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+ timer.reset();
+ this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, compressedSize);
+ start = inputStream.getPos();
+ timer.start();
+ codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData
+ .getCodec()).decompress(compressedData.nioBuffer(0, compressedSize), compressedSize,
+ dest.nioBuffer(0, uncompressedSize), uncompressedSize);
+ timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+ this.updateStats(pageHeader, "Decompress", start, timeToRead, compressedSize, uncompressedSize);
} finally {
compressedData.release();
}
@@ -180,7 +203,7 @@ final class PageReader {
* @throws java.io.IOException
*/
public boolean next() throws IOException {
-
+ Stopwatch timer = new Stopwatch();
currentPageCount = -1;
valuesRead = 0;
valuesReadyToRead = 0;
@@ -196,7 +219,15 @@ final class PageReader {
// TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
// I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
do {
+ long start=inputStream.getPos();
+ timer.start();
pageHeader = dataReader.readPageHeader();
+ long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+ this.updateStats(pageHeader, "Page Header Read", start, timeToRead, 0,0);
+ logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}","Page Header Read","",
+ this.parentColumnReader.parentReader.hadoopPath,
+ this.parentColumnReader.columnDescriptor.toString(), start, 0, 0, timeToRead);
+ timer.reset();
if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
readDictionaryPage(pageHeader, parentColumnReader);
}
@@ -207,7 +238,7 @@ final class PageReader {
allocatePageData(pageHeader.getUncompressed_page_size());
int compressedSize = pageHeader.getCompressed_page_size();
int uncompressedSize = pageHeader.getUncompressed_page_size();
- readPage(compressedSize, uncompressedSize, pageData);
+ readPage(pageHeader, compressedSize, uncompressedSize, pageData);
currentPageCount = pageHeader.data_page_header.num_values;
@@ -298,6 +329,37 @@ final class PageReader {
return currentPageCount != -1;
}
+ private void updateStats(PageHeader pageHeader, String op, long start, long time, long bytesin, long bytesout) {
+ String pageType = "Data Page";
+ if (pageHeader.type == PageType.DICTIONARY_PAGE) {
+ pageType = "Dictionary Page";
+ }
+ logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}", op, pageType.toString(),
+ this.parentColumnReader.parentReader.hadoopPath,
+ this.parentColumnReader.columnDescriptor.toString(), start, bytesin, bytesout, time);
+ if (pageHeader.type != PageType.DICTIONARY_PAGE) {
+ if (bytesin == bytesout) {
+ this.stats.timePageLoads += time;
+ this.stats.numPageLoads++;
+ this.stats.totalPageReadBytes += bytesin;
+ } else {
+ this.stats.timePagesDecompressed += time;
+ this.stats.numPagesDecompressed++;
+ this.stats.totalDecompressedBytes += bytesin;
+ }
+ } else {
+ if (bytesin == bytesout) {
+ this.stats.timeDictPageLoads += time;
+ this.stats.numDictPageLoads++;
+ this.stats.totalDictPageReadBytes += bytesin;
+ } else {
+ this.stats.timeDictPagesDecompressed += time;
+ this.stats.numDictPagesDecompressed++;
+ this.stats.totalDictDecompressedBytes += bytesin;
+ }
+ }
+ }
+
public void clearBuffers() {
if (pageData != null) {
pageData.release();
http://git-wip-us.apache.org/repos/asf/drill/blob/1e45f9fa/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 47d502f..7131b6c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -34,11 +34,13 @@ import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.MaterializedField.Key;
import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.parquet.ParquetReaderStats;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
@@ -105,13 +107,15 @@ public class ParquetRecordReader extends AbstractRecordReader {
long totalRecordsRead;
private final FragmentContext fragmentContext;
+ public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
+
public ParquetRecordReader(FragmentContext fragmentContext,
String path,
int rowGroupIndex,
FileSystem fs,
CodecFactory codecFactory,
ParquetMetadata footer,
- List<SchemaPath> columns) throws ExecutionSetupException {
+ List<SchemaPath> columns) throws ExecutionSetupException {
this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer,
columns);
}
@@ -479,5 +483,29 @@ public class ParquetRecordReader extends AbstractRecordReader {
varLengthReader.columns.clear();
varLengthReader = null;
}
+
+ if(parquetReaderStats != null) {
+ logger.trace("ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
+ hadoopPath,
+ parquetReaderStats.numDictPageHeaders,
+ parquetReaderStats.numPageHeaders,
+ parquetReaderStats.numDictPageLoads,
+ parquetReaderStats.numPageLoads,
+ parquetReaderStats.numDictPagesDecompressed,
+ parquetReaderStats.numPagesDecompressed,
+ parquetReaderStats.totalDictPageHeaderBytes,
+ parquetReaderStats.totalPageHeaderBytes,
+ parquetReaderStats.totalDictPageReadBytes,
+ parquetReaderStats.totalPageReadBytes,
+ parquetReaderStats.totalDictDecompressedBytes,
+ parquetReaderStats.totalDecompressedBytes,
+ parquetReaderStats.timeDictPageHeaders,
+ parquetReaderStats.timePageHeaders,
+ parquetReaderStats.timeDictPageLoads,
+ parquetReaderStats.timePageLoads,
+ parquetReaderStats.timeDictPagesDecompressed,
+ parquetReaderStats.timePagesDecompressed);
+ parquetReaderStats=null;
+ }
}
}