You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/30 21:10:14 UTC

[1/3] incubator-impala git commit: IMPALA-4223: Handle truncated file read from HDFS cache

Repository: incubator-impala
Updated Branches:
  refs/heads/master 6d1a130d8 -> 29faca568


IMPALA-4223: Handle truncated file read from HDFS cache

While overwriting files on HDFS via Hive it can happen that Impala sees
a partially written, cached file. In these cases we did not correctly
handle the partial cached read.

This change adds a check and triggers a fall back to disk reads for such
errors. If the file is partially written to disk, too, then the query
will report a file corruption warning through the disk read path.

Change-Id: Id1e1fdb0211819c5938956abb13b512350a46f1a
Reviewed-on: http://gerrit.cloudera.org:8080/4828
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Marcel Kornacker <ma...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/12e34b4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/12e34b4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/12e34b4c

Branch: refs/heads/master
Commit: 12e34b4c10e0685a24ad96ddbc82cd7a395474be
Parents: 6d1a130
Author: Lars Volker <lv...@cloudera.com>
Authored: Thu Oct 6 15:07:52 2016 +0200
Committer: Marcel Kornacker <ma...@cloudera.com>
Committed: Sun Oct 30 19:35:50 2016 +0000

----------------------------------------------------------------------
 be/src/runtime/disk-io-mgr-scan-range.cc | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/12e34b4c/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index 89092a9..8ebe7a5 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -436,13 +436,21 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
     return Status::OK();
   }
 
-  // Cached read succeeded.
+  // Cached read returned a buffer, verify we read the correct amount of data.
   void* buffer = const_cast<void*>(hadoopRzBufferGet(cached_buffer_));
   int32_t bytes_read = hadoopRzBufferLength(cached_buffer_);
-  // For now, entire the entire block is cached or none of it.
-  // TODO: if HDFS ever changes this, we'll have to handle the case where half
-  // the block is cached.
-  DCHECK_EQ(bytes_read, len());
+  // A partial read can happen when files are truncated.
+  // TODO: If HDFS ever supports partially cached blocks, we'll have to distinguish
+  // between errors and partially cached blocks here.
+  if (bytes_read < len()) {
+    stringstream ss;
+    VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ". Expected "
+      << len() << " bytes, but read " << bytes_read << ". Switching to disk read path.";
+    // Close the scan range. 'read_succeeded' is still false, so the caller will fall back
+    // to non-cached read of this scan range.
+    Close();
+    return Status::OK();
+  }
 
   // Create a single buffer desc for the entire scan range and enqueue that.
   // 'mem_tracker' is NULL because the memory is owned by the HDFS java client,


[2/3] incubator-impala git commit: IMPALA-3346: DeepCopy() Kudu rows into Impala tuples.

Posted by ta...@apache.org.
IMPALA-3346: DeepCopy() Kudu rows into Impala tuples.

Implements additional changes to make the memory layout
of Kudu rows identical to Impala tuples.
In particular, Kudu rows allocate a null bit even for
non-nullable columns, and Impala now does the same
for Kudu scan tuples.

This change exploits the now-identical Kudu and Impala
tuple layouts to avoid the expensive translation.

Perf: Mostafa reported a 50% efficiency gain on full
table scans.

Testing: A private core/hdfs run passed.

TODO:
1) Test cases with nullable/nonnullable non-PK slots.
2) Specify mem layout to client (depends on KUDU-1694)
3) Avoid mem copies (depends on KUDU-1695)

Change-Id: Ic911e4eff9fe98bf28d8a1bab5c9d7e9ab66d9cb
Reviewed-on: http://gerrit.cloudera.org:8080/4862
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Marcel Kornacker <ma...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9f3f4b71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9f3f4b71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9f3f4b71

Branch: refs/heads/master
Commit: 9f3f4b713d78e8dcf02c02def447195a04f408e6
Parents: 12e34b4
Author: Alex Behm <al...@cloudera.com>
Authored: Mon Oct 24 20:46:34 2016 -0700
Committer: Marcel Kornacker <ma...@cloudera.com>
Committed: Sun Oct 30 19:36:10 2016 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scanner.cc                     | 183 +++----------------
 be/src/exec/kudu-scanner.h                      |  28 ---
 .../apache/impala/analysis/SlotDescriptor.java  |   4 +-
 .../apache/impala/analysis/TupleDescriptor.java |  41 +++--
 .../org/apache/impala/planner/KuduScanNode.java |  13 +-
 5 files changed, 67 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index ca4ee9a..d230985 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -58,19 +58,10 @@ KuduScanner::KuduScanner(KuduScanNode* scan_node, RuntimeState* state)
   : scan_node_(scan_node),
     state_(state),
     cur_kudu_batch_num_read_(0),
-    last_alive_time_micros_(0),
-    num_string_slots_(0) {
+    last_alive_time_micros_(0) {
 }
 
 Status KuduScanner::Open() {
-  // Store columns that need relocation when materialized into the
-  // destination row batch.
-  for (int i = 0; i < scan_node_->tuple_desc_->slots().size(); ++i) {
-    if (scan_node_->tuple_desc_->slots()[i]->type().IsStringType()) {
-      string_slots_.push_back(scan_node_->tuple_desc_->slots()[i]);
-      ++num_string_slots_;
-    }
-  }
   return scan_node_->GetConjunctCtxs(&conjunct_ctxs_);
 }
 
@@ -109,7 +100,7 @@ Status KuduScanner::GetNext(RowBatch* row_batch, bool* eos) {
     RETURN_IF_CANCELLED(state_);
 
     if (cur_kudu_batch_num_read_ < cur_kudu_batch_.NumRows()) {
-      bool batch_done = false;
+      bool batch_done;
       RETURN_IF_ERROR(DecodeRowsIntoRowBatch(row_batch, &tuple, &batch_done));
       if (batch_done) break;
     }
@@ -172,54 +163,43 @@ Status KuduScanner::HandleEmptyProjection(RowBatch* row_batch, bool* batch_done)
   return Status::OK();
 }
 
-Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch,
-    Tuple** tuple_mem, bool* batch_done) {
+Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_mem,
+    bool* batch_done) {
+  *batch_done = false;
 
   // Short-circuit the count(*) case.
   if (scan_node_->tuple_desc_->slots().empty()) {
     return HandleEmptyProjection(row_batch, batch_done);
   }
 
-  // TODO consider consolidating the tuple creation/initialization here with the version
-  // that happens inside the loop.
-  int idx = row_batch->AddRow();
-  TupleRow* row = row_batch->GetRow(idx);
-  (*tuple_mem)->ClearNullBits(*scan_node_->tuple_desc());
-  row->SetTuple(tuple_idx(), *tuple_mem);
-
+  // Iterate through the Kudu rows, evaluate conjuncts and deep-copy survivors into
+  // 'row_batch'.
+  bool has_conjuncts = !conjunct_ctxs_.empty();
   int num_rows = cur_kudu_batch_.NumRows();
-  // Now iterate through the Kudu rows.
   for (int krow_idx = cur_kudu_batch_num_read_; krow_idx < num_rows; ++krow_idx) {
-    // Clear any NULL indicators set by a previous iteration.
-    (*tuple_mem)->ClearNullBits(*scan_node_->tuple_desc());
-
-    // Transform a Kudu row into an Impala row.
+    // Evaluate the conjuncts that haven't been pushed down to Kudu. Conjunct evaluation
+    // is performed directly on the Kudu tuple because its memory layout is identical to
+    // Impala's. We only copy the surviving tuples to Impala's output row batch.
     KuduScanBatch::RowPtr krow = cur_kudu_batch_.Row(krow_idx);
-    RETURN_IF_ERROR(KuduRowToImpalaTuple(krow, row_batch, *tuple_mem));
+    Tuple* kudu_tuple = reinterpret_cast<Tuple*>(const_cast<void*>(krow.cell(0)));
     ++cur_kudu_batch_num_read_;
-
-    // Evaluate the conjuncts that haven't been pushed down to Kudu.
-    if (conjunct_ctxs_.empty() ||
-        ExecNode::EvalConjuncts(&conjunct_ctxs_[0], conjunct_ctxs_.size(), row)) {
-      // Materialize those slots that require auxiliary memory
-      RETURN_IF_ERROR(RelocateValuesFromKudu(*tuple_mem, row_batch->tuple_data_pool()));
-      // If the conjuncts pass on the row commit it.
-      row_batch->CommitLastRow();
-      // If we've reached the capacity, or the LIMIT for the scan, return.
-      if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) {
-        *batch_done = true;
-        break;
-      }
-      // Add another row.
-      idx = row_batch->AddRow();
-
-      // Move to the next tuple in the tuple buffer.
-      *tuple_mem = next_tuple(*tuple_mem);
-      (*tuple_mem)->ClearNullBits(*scan_node_->tuple_desc());
-      // Make 'row' point to the new row.
-      row = row_batch->GetRow(idx);
-      row->SetTuple(tuple_idx(), *tuple_mem);
+    if (has_conjuncts && !ExecNode::EvalConjuncts(&conjunct_ctxs_[0],
+        conjunct_ctxs_.size(), reinterpret_cast<TupleRow*>(&kudu_tuple))) {
+      continue;
+    }
+    // Deep copy the tuple, set it in a new row, and commit the row.
+    kudu_tuple->DeepCopy(*tuple_mem, *scan_node_->tuple_desc(),
+        row_batch->tuple_data_pool());
+    TupleRow* row = row_batch->GetRow(row_batch->AddRow());
+    row->SetTuple(0, *tuple_mem);
+    row_batch->CommitLastRow();
+    // If we've reached the capacity, or the LIMIT for the scan, return.
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) {
+      *batch_done = true;
+      break;
     }
+    // Move to the next tuple in the tuple buffer.
+    *tuple_mem = next_tuple(*tuple_mem);
   }
   ExprContext::FreeLocalAllocations(conjunct_ctxs_);
 
@@ -227,113 +207,6 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch,
   return state_->GetQueryStatus();
 }
 
-void KuduScanner::SetSlotToNull(Tuple* tuple, const SlotDescriptor& slot) {
-  DCHECK(slot.is_nullable());
-  tuple->SetNull(slot.null_indicator_offset());
-}
-
-bool KuduScanner::IsSlotNull(Tuple* tuple, const SlotDescriptor& slot) {
-  return slot.is_nullable() && tuple->IsNull(slot.null_indicator_offset());
-}
-
-Status KuduScanner::RelocateValuesFromKudu(Tuple* tuple, MemPool* mem_pool) {
-  for (int i = 0; i < num_string_slots_; ++i) {
-    const SlotDescriptor* slot = string_slots_[i];
-    // NULL handling was done in KuduRowToImpalaTuple.
-    if (IsSlotNull(tuple, *slot)) continue;
-
-    // Extract the string value.
-    void* slot_ptr = tuple->GetSlot(slot->tuple_offset());
-    DCHECK(slot->type().IsVarLenStringType());
-
-    // The string value of the slot has a pointer to memory from the Kudu row.
-    StringValue* val = reinterpret_cast<StringValue*>(slot_ptr);
-    char* old_buf = val->ptr;
-    // Kudu never returns values larger than 8MB
-    DCHECK_LE(val->len, 8 * (1 << 20));
-    val->ptr = reinterpret_cast<char*>(mem_pool->TryAllocate(val->len));
-    if (LIKELY(val->len > 0)) {
-      // The allocator returns a NULL ptr when out of memory.
-      if (UNLIKELY(val->ptr == NULL)) {
-        return mem_pool->mem_tracker()->MemLimitExceeded(state_,
-            "Kudu scanner could not allocate memory for string", val->len);
-      }
-      memcpy(val->ptr, old_buf, val->len);
-    }
-  }
-  return Status::OK();
-}
-
-
-Status KuduScanner::KuduRowToImpalaTuple(const KuduScanBatch::RowPtr& row,
-    RowBatch* row_batch, Tuple* tuple) {
-  for (int i = 0; i < scan_node_->tuple_desc_->slots().size(); ++i) {
-    const SlotDescriptor* info = scan_node_->tuple_desc_->slots()[i];
-    void* slot = tuple->GetSlot(info->tuple_offset());
-
-    if (row.IsNull(i)) {
-      SetSlotToNull(tuple, *info);
-      continue;
-    }
-
-    int max_len = -1;
-    switch (info->type().type) {
-      case TYPE_VARCHAR:
-        max_len = info->type().len;
-        DCHECK_GT(max_len, 0);
-        // Fallthrough intended.
-      case TYPE_STRING: {
-        // For types with auxiliary memory (String, Binary,...) store the original memory
-        // location in the tuple to avoid the copy when the conjuncts do not pass. Relocate
-        // the memory into the row batch's memory in a later step.
-        kudu::Slice slice;
-        KUDU_RETURN_IF_ERROR(row.GetString(i, &slice),
-            "Error getting column value from Kudu.");
-        StringValue* sv = reinterpret_cast<StringValue*>(slot);
-        sv->ptr = const_cast<char*>(reinterpret_cast<const char*>(slice.data()));
-        sv->len = static_cast<int>(slice.size());
-        if (max_len > 0) sv->len = std::min(sv->len, max_len);
-        break;
-      }
-      case TYPE_TINYINT:
-        KUDU_RETURN_IF_ERROR(row.GetInt8(i, reinterpret_cast<int8_t*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      case TYPE_SMALLINT:
-        KUDU_RETURN_IF_ERROR(row.GetInt16(i, reinterpret_cast<int16_t*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      case TYPE_INT:
-        KUDU_RETURN_IF_ERROR(row.GetInt32(i, reinterpret_cast<int32_t*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      case TYPE_BIGINT:
-        KUDU_RETURN_IF_ERROR(row.GetInt64(i, reinterpret_cast<int64_t*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      case TYPE_FLOAT:
-        KUDU_RETURN_IF_ERROR(row.GetFloat(i, reinterpret_cast<float*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      case TYPE_DOUBLE:
-        KUDU_RETURN_IF_ERROR(row.GetDouble(i, reinterpret_cast<double*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      case TYPE_BOOLEAN:
-        KUDU_RETURN_IF_ERROR(row.GetBool(i, reinterpret_cast<bool*>(slot)),
-            "Error getting column value from Kudu.");
-        break;
-      default:
-        DCHECK(false) << "Impala type unsupported in Kudu: "
-            << TypeToString(info->type().type);
-        return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING,
-            TypeToString(info->type().type));
-    }
-  }
-  return Status::OK();
-}
-
-
 Status KuduScanner::GetNextScannerBatch() {
   SCOPED_TIMER(state_->total_storage_wait_timer());
   int64_t now = MonotonicMicros();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/be/src/exec/kudu-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h
index 0ed5221..bf84b08 100644
--- a/be/src/exec/kudu-scanner.h
+++ b/be/src/exec/kudu-scanner.h
@@ -63,12 +63,6 @@ class KuduScanner {
   /// Does this by adding sets of rows to 'row_batch' instead of adding one-by-one.
   Status HandleEmptyProjection(RowBatch* row_batch, bool* batch_done);
 
-  /// Set 'slot' to Null in 'tuple'.
-  void SetSlotToNull(Tuple* tuple, const SlotDescriptor& slot);
-
-  /// Returns true if 'slot' is Null in 'tuple'.
-  bool IsSlotNull(Tuple* tuple, const SlotDescriptor& slot);
-
   /// Decodes rows previously fetched from kudu, now in 'cur_rows_' into a RowBatch.
   ///  - 'batch' is the batch that will point to the new tuples.
   ///  - *tuple_mem should be the location to output tuples.
@@ -82,26 +76,11 @@ class KuduScanner {
   /// Closes the current kudu::client::KuduScanner.
   void CloseCurrentClientScanner();
 
-  /// Given a tuple, copies the values of those columns that require additional memory
-  /// from memory owned by the kudu::client::KuduScanner into memory owned by the
-  /// RowBatch. Assumes that the other columns are already materialized.
-  Status RelocateValuesFromKudu(Tuple* tuple, MemPool* mem_pool);
-
-  /// Transforms a kudu row into an Impala row. Columns that don't require auxiliary
-  /// memory are copied to the tuple directly. String columns are stored as a reference to
-  /// the memory of the RowPtr and need to be relocated later.
-  Status KuduRowToImpalaTuple(const kudu::client::KuduScanBatch::RowPtr& row,
-      RowBatch* row_batch, Tuple* tuple);
-
   inline Tuple* next_tuple(Tuple* t) const {
     uint8_t* mem = reinterpret_cast<uint8_t*>(t);
     return reinterpret_cast<Tuple*>(mem + scan_node_->tuple_desc()->byte_size());
   }
 
-  /// Returns the tuple idx into the row for this scan node to output to.
-  /// Currently this is always 0.
-  int tuple_idx() const { return 0; }
-
   KuduScanNode* scan_node_;
   RuntimeState* state_;
 
@@ -120,13 +99,6 @@ class KuduScanner {
 
   /// The scanner's cloned copy of the conjuncts to apply.
   vector<ExprContext*> conjunct_ctxs_;
-
-  /// List of string slots that need relocation for their auxiliary memory.
-  std::vector<SlotDescriptor*> string_slots_;
-
-  /// Number of string slots that need relocation (i.e. size of string_slots_), stored
-  /// separately to avoid calling vector::size() in the hot path (IMPALA-3348).
-  int num_string_slots_;
 };
 
 } /// namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
index 3a0fc06..9a9c058 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
@@ -25,6 +25,7 @@ import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.thrift.TSlotDescriptor;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -140,9 +141,8 @@ public class SlotDescriptor {
   }
 
   public Path getPath() { return path_; }
-
   public boolean isScanSlot() { return path_ != null && path_.isRootedAtTable(); }
-
+  public boolean isKuduScanSlot() { return getColumn() instanceof KuduColumn; }
   public Column getColumn() { return !isScanSlot() ? null : path_.destColumn(); }
 
   public ColumnStats getStats() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
index 5fbe5f6..bf6b93a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
@@ -19,6 +19,7 @@ package org.apache.impala.analysis;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,7 +29,6 @@ import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.Table;
-import org.apache.impala.catalog.View;
 import org.apache.impala.thrift.TTupleDescriptor;
 
 import com.google.common.base.Joiner;
@@ -59,8 +59,9 @@ import com.google.common.collect.Lists;
  *
  * Memory Layout
  * Slots are placed in descending order by size with trailing bytes to store null flags.
- * Null flags are omitted for non-nullable slots. There is no padding between tuples when
- * stored back-to-back in a row batch.
+ * Null flags are omitted for non-nullable slots, except for Kudu scan slots which always
+ * have a null flag to match Kudu's client row format. There is no padding between tuples
+ * when stored back-to-back in a row batch.
  *
  * Example: select bool_col, int_col, string_col, smallint_col from functional.alltypes
  * Slots:   string_col|int_col|smallint_col|bool_col|null_byte
@@ -118,6 +119,21 @@ public class TupleDescriptor {
     return result;
   }
 
+  /**
+   * Returns all materialized slots ordered by their offset. Valid to call after the
+   * mem layout has been computed.
+   */
+  public ArrayList<SlotDescriptor> getSlotsOrderedByOffset() {
+    Preconditions.checkState(hasMemLayout_);
+    ArrayList<SlotDescriptor> result = getMaterializedSlots();
+    Collections.sort(result, new Comparator<SlotDescriptor> () {
+      public int compare(SlotDescriptor a, SlotDescriptor b) {
+        return Integer.compare(a.getByteOffset(), b.getByteOffset());
+      }
+    });
+    return result;
+  }
+
   public Table getTable() {
     if (path_ == null) return null;
     return path_.getRootTable();
@@ -199,9 +215,7 @@ public class TupleDescriptor {
    * Materialize all slots.
    */
   public void materializeSlots() {
-    for (SlotDescriptor slot: slots_) {
-      slot.setIsMaterialized(true);
-    }
+    for (SlotDescriptor slot: slots_) slot.setIsMaterialized(true);
   }
 
   public TTupleDescriptor toThrift(Integer tableId) {
@@ -223,7 +237,7 @@ public class TupleDescriptor {
         new HashMap<Integer, List<SlotDescriptor>>();
 
     // populate slotsBySize
-    int numNullableSlots = 0;
+    int numNullBits = 0;
     int totalSlotSize = 0;
     for (SlotDescriptor d: slots_) {
       if (!d.isMaterialized()) continue;
@@ -239,14 +253,14 @@ public class TupleDescriptor {
       }
       totalSlotSize += d.getType().getSlotSize();
       slotsBySize.get(d.getType().getSlotSize()).add(d);
-      if (d.getIsNullable()) ++numNullableSlots;
+      if (d.getIsNullable() || d.isKuduScanSlot()) ++numNullBits;
     }
     // we shouldn't have anything of size <= 0
     Preconditions.checkState(!slotsBySize.containsKey(0));
     Preconditions.checkState(!slotsBySize.containsKey(-1));
 
     // assign offsets to slots in order of descending size
-    numNullBytes_ = (numNullableSlots + 7) / 8;
+    numNullBytes_ = (numNullBits + 7) / 8;
     int slotOffset = 0;
     int nullIndicatorByte = totalSlotSize;
     int nullIndicatorBit = 0;
@@ -266,13 +280,16 @@ public class TupleDescriptor {
         slotOffset += slotSize;
 
         // assign null indicator
-        if (d.getIsNullable()) {
+        if (d.getIsNullable() || d.isKuduScanSlot()) {
           d.setNullIndicatorByte(nullIndicatorByte);
           d.setNullIndicatorBit(nullIndicatorBit);
           nullIndicatorBit = (nullIndicatorBit + 1) % 8;
           if (nullIndicatorBit == 0) ++nullIndicatorByte;
-        } else {
-          // non-nullable slots will have 0 for the byte offset and -1 for the bit mask
+        }
+        // non-nullable slots have 0 for the byte offset and -1 for the bit mask
+        // to make sure IS NULL always evaluates to false in the BE without having
+        // to check nullability explicitly
+        if (!d.getIsNullable()) {
           d.setNullIndicatorBit(-1);
           d.setNullIndicatorByte(0);
         }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 9434801..d338608 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -118,13 +118,16 @@ public class KuduScanNode extends ScanNode {
       // Materialize the slots of the remaining conjuncts (i.e. those not pushed to Kudu)
       analyzer.materializeSlots(conjuncts_);
 
+      // Compute mem layout before the scan range locations because creation of the Kudu
+      // scan tokens depends on having a mem layout.
+      computeMemLayout(analyzer);
+
       // Creates Kudu scan tokens and sets the scan range locations.
       computeScanRangeLocations(analyzer, client, rpcTable);
     } catch (Exception e) {
       throw new ImpalaRuntimeException("Unable to initialize the Kudu scan node", e);
     }
 
-    computeMemLayout(analyzer);
     computeStats(analyzer);
   }
 
@@ -189,15 +192,15 @@ public class KuduScanNode extends ScanNode {
 
   /**
    * Returns KuduScanTokens for this scan given the projected columns and predicates that
-   * will be pushed to Kudu.
+   * will be pushed to Kudu. The projected Kudu columns are ordered by offset in an
+   * Impala tuple to make the Impala and Kudu tuple layouts identical.
    */
   private List<KuduScanToken> createScanTokens(KuduClient client,
       org.apache.kudu.client.KuduTable rpcTable) {
     List<String> projectedCols = Lists.newArrayList();
-    for (SlotDescriptor desc: getTupleDesc().getSlots()) {
-      if (desc.isMaterialized()) projectedCols.add(desc.getColumn().getName());
+    for (SlotDescriptor desc: getTupleDesc().getSlotsOrderedByOffset()) {
+      projectedCols.add(desc.getColumn().getName());
     }
-
     KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(rpcTable);
     tokenBuilder.setProjectedColumnNames(projectedCols);
     for (KuduPredicate predicate: kuduPredicates_) tokenBuilder.addPredicate(predicate);


[3/3] incubator-impala git commit: IMPALA-3823: Add timer to measure Parquet footer reads

Posted by ta...@apache.org.
IMPALA-3823: Add timer to measure Parquet footer reads

It's been observed that Parquet footer reads perform poorly especially
when reading from S3. This patch adds a timer "FooterProcessingTimer"
which keeps a track of the average time each split of each scan node
spends in reading and processing the parquet footer.

Added a new utility counter called SummaryStatsCounter which keeps
track of the min, max and average values seen so far from a set of
values. This counter is used to calculate the min, max and average
time taken to scan and process Parquet footers per query per node.

The RuntimeProfile has also been updated to keep a track of, display
and serialize this new counter to thrift.

BE tests have been added to verify that this counter works fine.

Change-Id: Icf87bad90037dd0cea63b10c537382ec0f980cbf
Reviewed-on: http://gerrit.cloudera.org:8080/4371
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Marcel Kornacker <ma...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/29faca56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/29faca56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/29faca56

Branch: refs/heads/master
Commit: 29faca5680e34d9211eb8d551385b671198f626b
Parents: 9f3f4b7
Author: Sailesh Mukil <sa...@cloudera.com>
Authored: Tue Jul 5 13:11:35 2016 -0700
Committer: Marcel Kornacker <ma...@cloudera.com>
Committed: Sun Oct 30 19:36:13 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc    |  17 +++-
 be/src/exec/hdfs-parquet-scanner.h     |   3 +
 be/src/util/runtime-profile-counters.h |  74 ++++++++++++++---
 be/src/util/runtime-profile-test.cc    |  59 ++++++++++++++
 be/src/util/runtime-profile.cc         | 121 +++++++++++++++++++++++++++-
 be/src/util/runtime-profile.h          |  14 +++-
 common/thrift/RuntimeProfile.thrift    |  13 +++
 tests/query_test/test_scanners.py      |  18 +++++
 8 files changed, 303 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/29faca56/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 542f4cc..79925a4 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -150,6 +150,7 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
       metadata_range_(NULL),
       dictionary_pool_(new MemPool(scan_node->mem_tracker())),
       assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
+      process_footer_timer_stats_(NULL),
       num_cols_counter_(NULL),
       num_row_groups_counter_(NULL),
       codegend_process_scratch_batch_fn_(NULL) {
@@ -164,6 +165,9 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
       ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT);
   num_row_groups_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroups", TUnit::UNIT);
+  process_footer_timer_stats_ =
+      ADD_SUMMARY_STATS_TIMER(
+          scan_node_->runtime_profile(), "FooterProcessingTime");
 
   codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>(
       scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET));
@@ -184,12 +188,21 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
 
+  // Each scan node can process multiple splits. Each split processes the footer once.
+  // We use a timer to measure the time taken to ProcessFooter() per split and add
+  // this time to the averaged timer.
+  MonotonicStopWatch single_footer_process_timer;
+  single_footer_process_timer.Start();
   // First process the file metadata in the footer.
-  Status status = ProcessFooter();
+  Status footer_status = ProcessFooter();
+  single_footer_process_timer.Stop();
+
+  process_footer_timer_stats_->UpdateCounter(single_footer_process_timer.ElapsedTime());
+
   // Release I/O buffers immediately to make sure they are cleaned up
   // in case we return a non-OK status anywhere below.
   context_->ReleaseCompletedResources(NULL, true);
-  RETURN_IF_ERROR(status);
+  RETURN_IF_ERROR(footer_status);
 
   // Parse the file schema into an internal representation for schema resolution.
   schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/29faca56/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 7707083..cc5795a 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -426,6 +426,9 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Timer for materializing rows.  This ignores time getting the next buffer.
   ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
 
+  /// Average and min/max time spent processing the footer by each split.
+  RuntimeProfile::SummaryStatsCounter* process_footer_timer_stats_;
+
   /// Number of columns that need to be read.
   RuntimeProfile::Counter* num_cols_counter_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/29faca56/be/src/util/runtime-profile-counters.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 77d7938..b37235f 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -46,6 +46,8 @@ namespace impala {
   #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \
       (profile)->AddTimeSeriesCounter(name, src_counter)
   #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
+  #define ADD_SUMMARY_STATS_TIMER(profile, name) \
+      (profile)->AddSummaryStatsCounter(name, TUnit::TIME_NS)
   #define ADD_CHILD_TIMER(profile, name, parent) \
       (profile)->AddCounter(name, TUnit::TIME_NS, parent)
   #define SCOPED_TIMER(c) \
@@ -65,6 +67,7 @@ namespace impala {
   #define ADD_COUNTER(profile, name, unit) NULL
   #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) NULL
   #define ADD_TIMER(profile, name) NULL
+  #define ADD_SUMMARY_STATS_TIMER(profile, name) NULL
   #define ADD_CHILD_TIMER(profile, name, parent) NULL
   #define SCOPED_TIMER(c)
   #define CANCEL_SAFE_SCOPED_TIMER(c)
@@ -181,17 +184,9 @@ class RuntimeProfile::AveragedCounter : public RuntimeProfile::Counter {
 
   /// The value for this counter should be updated through UpdateCounter().
   /// Set() and Add() should not be used.
-  virtual void Set(double value) {
-    DCHECK(false);
-  }
-
-  virtual void Set(int64_t value) {
-    DCHECK(false);
-  }
-
-  virtual void Add(int64_t delta) {
-    DCHECK(false);
-  }
+  virtual void Set(double value) { DCHECK(false); }
+  virtual void Set(int64_t value) { DCHECK(false); }
+  virtual void Add(int64_t delta) { DCHECK(false); }
 
  private:
   /// Map from counters to their existing values. Modified via UpdateCounter().
@@ -204,6 +199,63 @@ class RuntimeProfile::AveragedCounter : public RuntimeProfile::Counter {
   int64_t current_int_sum_;
 };
 
+/// This counter records multiple values and keeps a track of the minimum, maximum and
+/// average value of all the values seen so far.
+/// Unlike the AveragedCounter, this only keeps track of statistics of raw values
+/// whereas the AveragedCounter maintains an average of counters.
+/// value() stores the average.
+class RuntimeProfile::SummaryStatsCounter : public RuntimeProfile::Counter {
+ public:
+  SummaryStatsCounter(TUnit::type unit, int32_t total_num_values,
+      int64_t min_value, int64_t max_value, int64_t sum)
+   : Counter(unit),
+     total_num_values_(total_num_values),
+     min_(min_value),
+     max_(max_value),
+     sum_(sum) {
+    value_.Store(total_num_values == 0 ? 0 : sum / total_num_values);
+  }
+
+  SummaryStatsCounter(TUnit::type unit)
+   : Counter(unit),
+     total_num_values_(0),
+     min_(numeric_limits<int64_t>::max()),
+     max_(numeric_limits<int64_t>::min()),
+     sum_(0) {
+  }
+
+  int64_t MinValue();
+  int64_t MaxValue();
+  int32_t TotalNumValues();
+
+  /// Update sum_ with the new value and also update the min and the max values
+  /// seen so far.
+  void UpdateCounter(int64_t new_value);
+
+  /// The value for this counter should be updated through UpdateCounter() or SetStats().
+  /// Set() and Add() should not be used.
+  virtual void Set(double value) { DCHECK(false); }
+  virtual void Set(int64_t value) { DCHECK(false); }
+  virtual void Add(int64_t delta) { DCHECK(false); }
+
+  /// Overwrites the existing counter with 'counter'
+  void SetStats(const TSummaryStatsCounter& counter);
+
+  void ToThrift(TSummaryStatsCounter* counter, const std::string& name);
+
+ private:
+  /// The total number of values seen so far.
+  int32_t total_num_values_;
+
+  /// Summary statistics of values seen so far.
+  int64_t min_;
+  int64_t max_;
+  int64_t sum_;
+
+  // Protects min_, max_, sum_, total_num_values_ and value_.
+  SpinLock lock_;
+};
+
 /// A set of counters that measure thread info, such as total time, user time, sys time.
 class RuntimeProfile::ThreadCounters {
  private:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/29faca56/be/src/util/runtime-profile-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index 6910ac9..fe9f3ae 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -258,6 +258,65 @@ TEST(CountersTest, HighWaterMarkCounters) {
   EXPECT_EQ(bytes_counter->value(), 28);
 }
 
+TEST(CountersTest, SummaryStatsCounters) {
+  ObjectPool pool;
+  RuntimeProfile profile1(&pool, "Profile 1");
+  RuntimeProfile::SummaryStatsCounter* summary_stats_counter_1 =
+    profile1.AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
+
+  EXPECT_EQ(summary_stats_counter_1->value(), 0);
+  EXPECT_EQ(summary_stats_counter_1->MinValue(), numeric_limits<int64_t>::max());
+  EXPECT_EQ(summary_stats_counter_1->MaxValue(), numeric_limits<int64_t>::min());
+
+  summary_stats_counter_1->UpdateCounter(10);
+  EXPECT_EQ(summary_stats_counter_1->value(), 10);
+  EXPECT_EQ(summary_stats_counter_1->MinValue(), 10);
+  EXPECT_EQ(summary_stats_counter_1->MaxValue(), 10);
+
+  // Check that the average stays the same when updating with the same number.
+  summary_stats_counter_1->UpdateCounter(10);
+  EXPECT_EQ(summary_stats_counter_1->value(), 10);
+  EXPECT_EQ(summary_stats_counter_1->MinValue(), 10);
+  EXPECT_EQ(summary_stats_counter_1->MaxValue(), 10);
+
+  summary_stats_counter_1->UpdateCounter(40);
+  EXPECT_EQ(summary_stats_counter_1->value(), 20);
+  EXPECT_EQ(summary_stats_counter_1->MinValue(), 10);
+  EXPECT_EQ(summary_stats_counter_1->MaxValue(), 40);
+
+  // Verify an update with 0. This should still change the average as the number of
+  // samples increase
+  summary_stats_counter_1->UpdateCounter(0);
+  EXPECT_EQ(summary_stats_counter_1->value(), 15);
+  EXPECT_EQ(summary_stats_counter_1->MinValue(), 0);
+  EXPECT_EQ(summary_stats_counter_1->MaxValue(), 40);
+
+  // Verify a negative update..
+  summary_stats_counter_1->UpdateCounter(-40);
+  EXPECT_EQ(summary_stats_counter_1->value(), 4);
+  EXPECT_EQ(summary_stats_counter_1->MinValue(), -40);
+  EXPECT_EQ(summary_stats_counter_1->MaxValue(), 40);
+
+  RuntimeProfile profile2(&pool, "Profile 2");
+  RuntimeProfile::SummaryStatsCounter* summary_stats_counter_2 =
+    profile2.AddSummaryStatsCounter("summary_stats", TUnit::UNIT);
+
+  summary_stats_counter_2->UpdateCounter(100);
+  EXPECT_EQ(summary_stats_counter_2->value(), 100);
+  EXPECT_EQ(summary_stats_counter_2->MinValue(), 100);
+  EXPECT_EQ(summary_stats_counter_2->MaxValue(), 100);
+
+  TRuntimeProfileTree tprofile1;
+  profile1.ToThrift(&tprofile1);
+
+  // Merge profile1 and profile2 and check that profile2 is overwritten.
+  profile2.Update(tprofile1);
+  EXPECT_EQ(summary_stats_counter_2->value(), 4);
+  EXPECT_EQ(summary_stats_counter_2->MinValue(), -40);
+  EXPECT_EQ(summary_stats_counter_2->MaxValue(), 40);
+
+}
+
 TEST(CountersTest, DerivedCounters) {
   ObjectPool pool;
   RuntimeProfile profile(&pool, "Profile");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/29faca56/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index b19ba33..a9b46b9 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -133,6 +133,14 @@ RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
     }
   }
 
+  if (node.__isset.summary_stats_counters) {
+    for (const TSummaryStatsCounter& val: node.summary_stats_counters) {
+      profile->summary_stats_map_[val.name] =
+          pool->Add(new SummaryStatsCounter(
+              val.unit, val.total_num_values, val.min_value, val.max_value, val.sum));
+    }
+  }
+
   profile->child_counter_map_ = node.child_counters_map;
   profile->info_strings_ = node.info_strings;
   profile->info_strings_display_order_ = node.info_strings_display_order;
@@ -286,6 +294,21 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
     }
   }
 
+  {
+    lock_guard<SpinLock> l(summary_stats_map_lock_);
+    for (int i = 0; i < node.summary_stats_counters.size(); ++i) {
+      const TSummaryStatsCounter& c = node.summary_stats_counters[i];
+      SummaryStatsCounterMap::iterator it = summary_stats_map_.find(c.name);
+      if (it == summary_stats_map_.end()) {
+        summary_stats_map_[c.name] =
+            pool_->Add(new SummaryStatsCounter(
+                c.unit, c.total_num_values, c.min_value, c.max_value, c.sum));
+      } else {
+        it->second->SetStats(c);
+      }
+    }
+  }
+
   ++*idx;
   {
     lock_guard<SpinLock> l(children_lock_);
@@ -651,6 +674,28 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
     }
   }
 
+  {
+    lock_guard<SpinLock> l(summary_stats_map_lock_);
+    // Print all SummaryStatsCounters as following:
+    // <Name>: (Avg: <value> ; Min: <min_value> ; Max: <max_value> ;
+    // Number of samples: <total>)
+    for (const SummaryStatsCounterMap::value_type& v: summary_stats_map_) {
+      if (v.second->TotalNumValues() == 0) {
+        // No point printing all the stats if number of samples is zero.
+        stream << prefix << "  - " << v.first << ": "
+               << PrettyPrinter::Print(v.second->value(), v.second->unit(), true)
+               << " (Number of samples: " << v.second->TotalNumValues() << ")" << endl;
+      } else {
+        stream << prefix << "   - " << v.first << ": (Avg: "
+               << PrettyPrinter::Print(v.second->value(), v.second->unit(), true)
+               << " ; Min: "
+               << PrettyPrinter::Print(v.second->MinValue(), v.second->unit(), true)
+               << " ; Max: "
+               << PrettyPrinter::Print(v.second->MaxValue(), v.second->unit(), true)
+               << " ; Number of samples: " << v.second->TotalNumValues() << ")" << endl;
+      }
+    }
+  }
   RuntimeProfile::PrintChildCounters(
       prefix, ROOT_COUNTER, counter_map, child_counter_map, s);
 
@@ -760,8 +805,8 @@ void RuntimeProfile::ToThrift(vector<TRuntimeProfileNode>* nodes) const {
   {
     lock_guard<SpinLock> l(time_series_counter_map_lock_);
     if (time_series_counter_map_.size() != 0) {
-      node.__set_time_series_counters(vector<TTimeSeriesCounter>());
-      node.time_series_counters.resize(time_series_counter_map_.size());
+      node.__set_time_series_counters(
+          vector<TTimeSeriesCounter>(time_series_counter_map_.size()));
       int idx = 0;
       for (const TimeSeriesCounterMap::value_type& val: time_series_counter_map_) {
         val.second->ToThrift(&node.time_series_counters[idx++]);
@@ -769,6 +814,18 @@ void RuntimeProfile::ToThrift(vector<TRuntimeProfileNode>* nodes) const {
     }
   }
 
+  {
+    lock_guard<SpinLock> l(summary_stats_map_lock_);
+    if (summary_stats_map_.size() != 0) {
+      node.__set_summary_stats_counters(
+          vector<TSummaryStatsCounter>(summary_stats_map_.size()));
+      int idx = 0;
+      for (const SummaryStatsCounterMap::value_type& val: summary_stats_map_) {
+        val.second->ToThrift(&node.summary_stats_counters[idx++], val.first);
+      }
+    }
+  }
+
   ChildVector children;
   {
     lock_guard<SpinLock> l(children_lock_);
@@ -896,6 +953,18 @@ void RuntimeProfile::PrintChildCounters(const string& prefix,
   }
 }
 
+RuntimeProfile::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter(
+    const string& name, TUnit::type unit, const std::string& parent_counter_name) {
+  DCHECK_EQ(is_averaged_profile_, false);
+  lock_guard<SpinLock> l(summary_stats_map_lock_);
+  if (summary_stats_map_.find(name) != summary_stats_map_.end()) {
+    return summary_stats_map_[name];
+  }
+  SummaryStatsCounter* counter = pool_->Add(new SummaryStatsCounter(unit));
+  summary_stats_map_[name] = counter;
+  return counter;
+}
+
 RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddTimeSeriesCounter(
     const string& name, TUnit::type unit, DerivedCounterFunction fn) {
   DCHECK(fn != NULL);
@@ -945,4 +1014,52 @@ void RuntimeProfile::EventSequence::ToThrift(TEventSequence* seq) const {
   }
 }
 
+void RuntimeProfile::SummaryStatsCounter::ToThrift(TSummaryStatsCounter* counter,
+    const std::string& name) {
+  lock_guard<SpinLock> l(lock_);
+  counter->name = name;
+  counter->unit = unit_;
+  counter->sum = sum_;
+  counter->total_num_values = total_num_values_;
+  counter->min_value = min_;
+  counter->max_value = max_;
+}
+
+void RuntimeProfile::SummaryStatsCounter::UpdateCounter(int64_t new_value) {
+  lock_guard<SpinLock> l(lock_);
+
+  ++total_num_values_;
+  sum_ += new_value;
+  value_.Store(sum_ / total_num_values_);
+
+  if (new_value < min_) min_ = new_value;
+  if (new_value > max_) max_ = new_value;
+}
+
+void RuntimeProfile::SummaryStatsCounter::SetStats(const TSummaryStatsCounter& counter) {
+  lock_guard<SpinLock> l(lock_);
+  unit_ = counter.unit;
+  sum_ = counter.sum;
+  total_num_values_ = counter.total_num_values;
+  min_ = counter.min_value;
+  max_ = counter.max_value;
+
+  value_.Store(sum_ / total_num_values_);
+}
+
+int64_t RuntimeProfile::SummaryStatsCounter::MinValue() {
+  lock_guard<SpinLock> l(lock_);
+  return min_;
+}
+
+int64_t RuntimeProfile::SummaryStatsCounter::MaxValue() {
+  lock_guard<SpinLock> l(lock_);
+  return max_;
+}
+
+int32_t RuntimeProfile::SummaryStatsCounter::TotalNumValues() {
+  lock_guard<SpinLock> l(lock_);
+  return total_num_values_;
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/29faca56/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 513f39d..c409d62 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -94,6 +94,7 @@ class RuntimeProfile {
   class DerivedCounter;
   class EventSequence;
   class HighWaterMarkCounter;
+  class SummaryStatsCounter;
   class ThreadCounters;
   class TimeSeriesCounter;
 
@@ -159,8 +160,13 @@ class RuntimeProfile {
   Counter* AddCounter(const std::string& name, TUnit::type unit,
       const std::string& parent_counter_name = "");
 
+  /// Adds a counter that tracks the min, max and average values to the runtime profile.
+  /// Otherwise, same behavior as AddCounter().
+  SummaryStatsCounter* AddSummaryStatsCounter(const std::string& name, TUnit::type unit,
+      const std::string& parent_counter_name = "");
+
   /// Adds a high water mark counter to the runtime profile. Otherwise, same behavior
-  /// as AddCounter()
+  /// as AddCounter().
   HighWaterMarkCounter* AddHighWaterMarkCounter(const std::string& name,
       TUnit::type unit, const std::string& parent_counter_name = "");
 
@@ -396,6 +402,12 @@ class RuntimeProfile {
   /// Protects time_series_counter_map_.
   mutable SpinLock time_series_counter_map_lock_;
 
+  typedef std::map<std::string, SummaryStatsCounter*> SummaryStatsCounterMap;
+  SummaryStatsCounterMap summary_stats_map_;
+
+  /// Protects summary_stats_map_.
+  mutable SpinLock summary_stats_map_lock_;
+
   Counter counter_total_time_;
 
   /// Total time spent waiting (on non-children) that should not be counted when

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/29faca56/common/thrift/RuntimeProfile.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/RuntimeProfile.thrift b/common/thrift/RuntimeProfile.thrift
index 068108b..8751b17 100644
--- a/common/thrift/RuntimeProfile.thrift
+++ b/common/thrift/RuntimeProfile.thrift
@@ -52,6 +52,16 @@ struct TTimeSeriesCounter {
   4: required list<i64> values
 }
 
+// Thrift version of RuntimeProfile::SummaryStatsCounter.
+struct TSummaryStatsCounter {
+  1: required string name
+  2: required Metrics.TUnit unit
+  3: required i64 sum
+  4: required i64 total_num_values
+  5: required i64 min_value
+  6: required i64 max_value
+}
+
 // A single runtime profile
 struct TRuntimeProfileNode {
   1: required string name
@@ -80,6 +90,9 @@ struct TRuntimeProfileNode {
 
   // List of time series counters
   10: optional list<TTimeSeriesCounter> time_series_counters
+
+  // List of summary stats counters
+  11: optional list<TSummaryStatsCounter> summary_stats_counters
 }
 
 // A flattened tree of runtime profiles, obtained by an

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/29faca56/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 6dce0af..0231261 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -337,6 +337,14 @@ class TestParquet(ImpalaTestSuite):
         'ScanRangesComplete: ([0-9]*)', runtime_profile)
     num_rows_read_list = re.findall('RowsRead: [0-9.K]* \(([0-9]*)\)', runtime_profile)
 
+    REGEX_UNIT_SECOND = "[0-9]*[s]*[0-9]*[.]*[0-9]*[nm]*[s]*"
+    REGEX_MIN_MAX_FOOTER_PROCESSING_TIME = \
+        ("FooterProcessingTime: \(Avg: %s ; \(Min: (%s) ; Max: (%s) ; "
+            "Number of samples: %s\)" % (REGEX_UNIT_SECOND, REGEX_UNIT_SECOND,
+            REGEX_UNIT_SECOND, "[0-9]*"))
+    footer_processing_time_list = re.findall(
+        REGEX_MIN_MAX_FOOTER_PROCESSING_TIME, runtime_profile)
+
     # This will fail if the number of impalads != 3
     # The fourth fragment is the "Averaged Fragment"
     assert len(num_row_groups_list) == 4
@@ -359,6 +367,16 @@ class TestParquet(ImpalaTestSuite):
     for scan_ranges_complete in scan_ranges_complete_list:
       assert int(scan_ranges_complete) == ranges_per_node
 
+    # This checks if the SummaryStatsCounter works correctly. When there is one scan
+    # range per node, we verify that the FooterProcessingTime counter has the min, max
+    # and average values as the same since we have only one sample (i.e. only one range)
+    # TODO: Test this for multiple ranges per node as well. This requires parsing the
+    # stat times as strings and comparing if min <= avg <= max.
+    if ranges_per_node == 1:
+      for min_max_time in footer_processing_time_list:
+        # Assert that (min == avg == max)
+        assert min_max_time[0] == min_max_time[1] == min_max_time[2] != 0
+
   def test_annotate_utf8_option(self, vector, unique_database):
     if self.exploration_strategy() != 'exhaustive': pytest.skip("Only run in exhaustive")