You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/12/15 01:58:01 UTC

drill git commit: DRILL-4152: Add trace logging to Parquet reader for performance tuning. This closes #298

Repository: drill
Updated Branches:
  refs/heads/master e529df460 -> 1e45f9fa1


DRILL-4152: Add trace logging to  Parquet reader for performance tuning. This closes #298


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1e45f9fa
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1e45f9fa
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1e45f9fa

Branch: refs/heads/master
Commit: 1e45f9fa19363a9a337401b7fa64e198480b694a
Parents: e529df4
Author: Parth Chandra <pa...@apache.org>
Authored: Thu Dec 3 14:32:30 2015 -0800
Committer: Parth Chandra <pa...@apache.org>
Committed: Mon Dec 14 16:56:55 2015 -0800

----------------------------------------------------------------------
 .../exec/store/parquet/ParquetReaderStats.java  | 48 ++++++++++
 .../store/parquet/ParquetScanBatchCreator.java  | 10 ++-
 .../store/parquet/columnreaders/PageReader.java | 94 ++++++++++++++++----
 .../columnreaders/ParquetRecordReader.java      | 30 ++++++-
 4 files changed, 163 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/1e45f9fa/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
new file mode 100644
index 0000000..e95b0c8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+public class ParquetReaderStats {
+
+  public long numDictPageHeaders;
+  public long numPageHeaders;
+  public long numDictPageLoads;
+  public long numPageLoads;
+  public long numDictPagesDecompressed;
+  public long numPagesDecompressed;
+
+  public long totalDictPageHeaderBytes;
+  public long totalPageHeaderBytes;
+  public long totalDictPageReadBytes;
+  public long totalPageReadBytes;
+  public long totalDictDecompressedBytes;
+  public long totalDecompressedBytes;
+
+  public long timeDictPageHeaders;
+  public long timePageHeaders;
+  public long timeDictPageLoads;
+  public long timePageLoads;
+  public long timeDictPagesDecompressed;
+  public long timePagesDecompressed;
+
+  public ParquetReaderStats() {
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/drill/blob/1e45f9fa/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index d743fe1..0cb12f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -21,9 +21,11 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Stopwatch;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -122,9 +124,13 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       These fields will be added to the constructor below
       */
       try {
+        Stopwatch timer = new Stopwatch();
         if ( ! footers.containsKey(e.getPath())){
-          footers.put(e.getPath(),
-              ParquetFileReader.readFooter(conf, new Path(e.getPath())));
+          timer.start();
+          ParquetMetadata footer = ParquetFileReader.readFooter(conf, new Path(e.getPath()));
+          long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+          logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", e.getPath(), "", 0, 0, 0, timeToRead);
+          footers.put(e.getPath(), footer );
         }
         if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) {
           readers.add(

http://git-wip-us.apache.org/repos/asf/drill/blob/1e45f9fa/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 762f91a..bcc7b33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -26,10 +26,12 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.store.parquet.ColumnDataReader;
 import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
+import org.apache.drill.exec.store.parquet.ParquetReaderStats;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -52,6 +54,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.PrimitiveType;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 
 // class to keep track of the read position of variable length columns
 final class PageReader {
@@ -98,22 +101,26 @@ final class PageReader {
 
   int currentPageCount = -1;
 
+  private FSDataInputStream inputStream;
+
   // These need to be held throughout reading of the entire column chunk
   List<ByteBuf> allocatedDictionaryBuffers;
 
   private final CodecFactory codecFactory;
 
+  private final ParquetReaderStats stats;
+
   PageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
     throws ExecutionSetupException{
     this.parentColumnReader = parentStatus;
     allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
     codecFactory = parentColumnReader.parentReader.getCodecFactory();
-
+    this.stats = parentColumnReader.parentReader.parquetReaderStats;
     long start = columnChunkMetaData.getFirstDataPageOffset();
     try {
-      FSDataInputStream f = fs.open(path);
-      this.dataReader = new ColumnDataReader(f, start, columnChunkMetaData.getTotalSize());
-      loadDictionaryIfExists(parentStatus, columnChunkMetaData, f);
+      inputStream  = fs.open(path);
+      this.dataReader = new ColumnDataReader(inputStream, start, columnChunkMetaData.getTotalSize());
+      loadDictionaryIfExists(parentStatus, columnChunkMetaData, inputStream);
 
     } catch (IOException e) {
       throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: "
@@ -124,9 +131,15 @@ final class PageReader {
 
   private void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
       final ColumnChunkMetaData columnChunkMetaData, final FSDataInputStream f) throws IOException {
+    Stopwatch timer = new Stopwatch();
     if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
       f.seek(columnChunkMetaData.getDictionaryPageOffset());
+      long start=f.getPos();
+      timer.start();
       final PageHeader pageHeader = Util.readPageHeader(f);
+      long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+      long pageHeaderBytes=f.getPos()-start;
+      this.updateStats(pageHeader, "Page Header", start, timeToRead, pageHeaderBytes, pageHeaderBytes);
       assert pageHeader.type == PageType.DICTIONARY_PAGE;
       readDictionaryPage(pageHeader, parentStatus);
     }
@@ -138,7 +151,7 @@ final class PageReader {
     int uncompressedSize = pageHeader.getUncompressed_page_size();
 
     final DrillBuf dictionaryData = allocateDictionaryBuffer(uncompressedSize);
-    readPage(compressedSize, uncompressedSize, dictionaryData);
+    readPage(pageHeader, compressedSize, uncompressedSize, dictionaryData);
 
     DictionaryPage page = new DictionaryPage(
         asBytesInput(dictionaryData, 0, uncompressedSize),
@@ -149,20 +162,30 @@ final class PageReader {
     this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
   }
 
-  public void readPage(int compressedSize, int uncompressedSize, DrillBuf dest) throws IOException {
+  public void readPage(PageHeader pageHeader, int compressedSize, int uncompressedSize, DrillBuf dest) throws IOException {
+    Stopwatch timer = new Stopwatch();
+    long timeToRead;
+    long start=inputStream.getPos();
     if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED) {
+      timer.start();
       dataReader.loadPage(dest, compressedSize);
+      timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+      this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, uncompressedSize);
     } else {
       final DrillBuf compressedData = allocateTemporaryBuffer(compressedSize);
       try {
-        dataReader.loadPage(compressedData, compressedSize);
-        codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData
-            .getCodec()).decompress(
-            compressedData.nioBuffer(0, compressedSize),
-            compressedSize,
-            dest.nioBuffer(0, uncompressedSize),
-            uncompressedSize);
-
+      timer.start();
+      dataReader.loadPage(compressedData, compressedSize);
+      timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+      timer.reset();
+      this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, compressedSize);
+      start = inputStream.getPos();
+      timer.start();
+      codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData
+          .getCodec()).decompress(compressedData.nioBuffer(0, compressedSize), compressedSize,
+          dest.nioBuffer(0, uncompressedSize), uncompressedSize);
+        timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+        this.updateStats(pageHeader, "Decompress", start, timeToRead, compressedSize, uncompressedSize);
       } finally {
         compressedData.release();
       }
@@ -180,7 +203,7 @@ final class PageReader {
    * @throws java.io.IOException
    */
   public boolean next() throws IOException {
-
+    Stopwatch timer = new Stopwatch();
     currentPageCount = -1;
     valuesRead = 0;
     valuesReadyToRead = 0;
@@ -196,7 +219,15 @@ final class PageReader {
     // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
     // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
     do {
+      long start=inputStream.getPos();
+      timer.start();
       pageHeader = dataReader.readPageHeader();
+      long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+      this.updateStats(pageHeader, "Page Header Read", start, timeToRead, 0,0);
+      logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}","Page Header Read","",
+          this.parentColumnReader.parentReader.hadoopPath,
+          this.parentColumnReader.columnDescriptor.toString(), start, 0, 0, timeToRead);
+      timer.reset();
       if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
         readDictionaryPage(pageHeader, parentColumnReader);
       }
@@ -207,7 +238,7 @@ final class PageReader {
     allocatePageData(pageHeader.getUncompressed_page_size());
     int compressedSize = pageHeader.getCompressed_page_size();
     int uncompressedSize = pageHeader.getUncompressed_page_size();
-    readPage(compressedSize, uncompressedSize, pageData);
+    readPage(pageHeader, compressedSize, uncompressedSize, pageData);
 
     currentPageCount = pageHeader.data_page_header.num_values;
 
@@ -298,6 +329,37 @@ final class PageReader {
     return currentPageCount != -1;
   }
 
+  private void updateStats(PageHeader pageHeader, String op, long start, long time, long bytesin, long bytesout) {
+    String pageType = "Data Page";
+    if (pageHeader.type == PageType.DICTIONARY_PAGE) {
+      pageType = "Dictionary Page";
+    }
+    logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}", op, pageType.toString(),
+        this.parentColumnReader.parentReader.hadoopPath,
+        this.parentColumnReader.columnDescriptor.toString(), start, bytesin, bytesout, time);
+    if (pageHeader.type != PageType.DICTIONARY_PAGE) {
+      if (bytesin == bytesout) {
+        this.stats.timePageLoads += time;
+        this.stats.numPageLoads++;
+        this.stats.totalPageReadBytes += bytesin;
+      } else {
+        this.stats.timePagesDecompressed += time;
+        this.stats.numPagesDecompressed++;
+        this.stats.totalDecompressedBytes += bytesin;
+      }
+    } else {
+      if (bytesin == bytesout) {
+        this.stats.timeDictPageLoads += time;
+        this.stats.numDictPageLoads++;
+        this.stats.totalDictPageReadBytes += bytesin;
+      } else {
+        this.stats.timeDictPagesDecompressed += time;
+        this.stats.numDictPagesDecompressed++;
+        this.stats.totalDictDecompressedBytes += bytesin;
+      }
+    }
+  }
+
   public void clearBuffers() {
     if (pageData != null) {
       pageData.release();

http://git-wip-us.apache.org/repos/asf/drill/blob/1e45f9fa/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 47d502f..7131b6c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -34,11 +34,13 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.MaterializedField.Key;
 import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.parquet.ParquetReaderStats;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
@@ -105,13 +107,15 @@ public class ParquetRecordReader extends AbstractRecordReader {
   long totalRecordsRead;
   private final FragmentContext fragmentContext;
 
+  public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
+
   public ParquetRecordReader(FragmentContext fragmentContext,
       String path,
       int rowGroupIndex,
       FileSystem fs,
       CodecFactory codecFactory,
       ParquetMetadata footer,
-                             List<SchemaPath> columns) throws ExecutionSetupException {
+      List<SchemaPath> columns) throws ExecutionSetupException {
     this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer,
         columns);
   }
@@ -479,5 +483,29 @@ public class ParquetRecordReader extends AbstractRecordReader {
       varLengthReader.columns.clear();
       varLengthReader = null;
     }
+
+    if(parquetReaderStats != null) {
+      logger.trace("ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
+          hadoopPath,
+          parquetReaderStats.numDictPageHeaders,
+          parquetReaderStats.numPageHeaders,
+          parquetReaderStats.numDictPageLoads,
+          parquetReaderStats.numPageLoads,
+          parquetReaderStats.numDictPagesDecompressed,
+          parquetReaderStats.numPagesDecompressed,
+          parquetReaderStats.totalDictPageHeaderBytes,
+          parquetReaderStats.totalPageHeaderBytes,
+          parquetReaderStats.totalDictPageReadBytes,
+          parquetReaderStats.totalPageReadBytes,
+          parquetReaderStats.totalDictDecompressedBytes,
+          parquetReaderStats.totalDecompressedBytes,
+          parquetReaderStats.timeDictPageHeaders,
+          parquetReaderStats.timePageHeaders,
+          parquetReaderStats.timeDictPageLoads,
+          parquetReaderStats.timePageLoads,
+          parquetReaderStats.timeDictPagesDecompressed,
+          parquetReaderStats.timePagesDecompressed);
+      parquetReaderStats=null;
+    }
   }
 }