You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/22 17:58:46 UTC

[1/4] impala git commit: IMPALA-5139: Update mvn-quiet.sh to print execution content to log file

Repository: impala
Updated Branches:
  refs/heads/2.x 9d7b2103f -> d89db5b0d


IMPALA-5139: Update mvn-quiet.sh to print execution content to log file

Verified the changes to mvn-quiet.sh by trigerring an impala build by
running bootstrap_development.sh which invokes mvn-quiet at multiple
places. Verified the creation of mvn log file with the relevant content
in the $IMPALA_HOME/logs/mvn folder

Change-Id: I475b17a4dccfa624dda61402491b461c53473f8b
Reviewed-on: http://gerrit.cloudera.org:8080/9273
Reviewed-by: Michael Brown <mi...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/11e0bede
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/11e0bede
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/11e0bede

Branch: refs/heads/2.x
Commit: 11e0bede387fa713ef9b2cda16454f7db6ea2594
Parents: 3846177
Author: njanarthanancldr <nj...@cloudera.com>
Authored: Fri Feb 9 16:08:07 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 22 02:52:03 2018 +0000

----------------------------------------------------------------------
 bin/impala-config.sh |  3 ++-
 bin/mvn-quiet.sh     | 20 +++++++++++++-------
 2 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/11e0bede/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 15b9116..09476c3 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -414,12 +414,13 @@ export IMPALA_FE_TEST_COVERAGE_DIR="${IMPALA_FE_TEST_LOGS_DIR}/coverage"
 export IMPALA_BE_TEST_LOGS_DIR="${IMPALA_LOGS_DIR}/be_tests"
 export IMPALA_EE_TEST_LOGS_DIR="${IMPALA_LOGS_DIR}/ee_tests"
 export IMPALA_CUSTOM_CLUSTER_TEST_LOGS_DIR="${IMPALA_LOGS_DIR}/custom_cluster_tests"
+export IMPALA_MVN_LOGS_DIR="${IMPALA_LOGS_DIR}/mvn"
 # List of all Impala log dirs so they can be created by buildall.sh
 export IMPALA_ALL_LOGS_DIRS="${IMPALA_CLUSTER_LOGS_DIR}
   ${IMPALA_DATA_LOADING_LOGS_DIR} ${IMPALA_DATA_LOADING_SQL_DIR}
   ${IMPALA_FE_TEST_LOGS_DIR} ${IMPALA_FE_TEST_COVERAGE_DIR}
   ${IMPALA_BE_TEST_LOGS_DIR} ${IMPALA_EE_TEST_LOGS_DIR}
-  ${IMPALA_CUSTOM_CLUSTER_TEST_LOGS_DIR}"
+  ${IMPALA_CUSTOM_CLUSTER_TEST_LOGS_DIR} ${IMPALA_MVN_LOGS_DIR}"
 
 # Reduce the concurrency for local tests to half the number of cores in the system.
 CORES=$(($(getconf _NPROCESSORS_ONLN) / 2))

http://git-wip-us.apache.org/repos/asf/impala/blob/11e0bede/bin/mvn-quiet.sh
----------------------------------------------------------------------
diff --git a/bin/mvn-quiet.sh b/bin/mvn-quiet.sh
index 591133d..beeaeb9 100755
--- a/bin/mvn-quiet.sh
+++ b/bin/mvn-quiet.sh
@@ -19,14 +19,20 @@
 
 # Utility script that invokes maven but filters out noisy info logging
 set -euo pipefail
-echo "========================================================================"
-echo "Running mvn $IMPALA_MAVEN_OPTIONS $@"
-echo "Directory: $(pwd)"
-echo "========================================================================"
+
+LOG_FILE="${IMPALA_MVN_LOGS_DIR}/mvn.log"
+
+mkdir -p "$IMPALA_MVN_LOGS_DIR"
+
+cat << EOF | tee -a "$LOG_FILE"
+========================================================================
+Running mvn $IMPALA_MAVEN_OPTIONS $@
+Directory $(pwd)
+========================================================================
+EOF
+
 if ! mvn $IMPALA_MAVEN_OPTIONS "$@" | \
-    grep -E -e WARNING -e ERROR -e SUCCESS -e FAILURE -e Test; then
+  tee -a "$LOG_FILE" | grep -E -e WARNING -e ERROR -e SUCCESS -e FAILURE -e Test; then
   echo "mvn $IMPALA_MAVEN_OPTIONS $@ exited with code $?"
   exit 1
 fi
-echo "------------------------------------------------------------------------"
-echo


[3/4] impala git commit: IMPALA-6461 : Micro-optimizations to DataStreamSender::Send

Posted by ta...@apache.org.
IMPALA-6461 : Micro-optimizations to DataStreamSender::Send

While analyzing performance of partition exchange operator,
I noticed that there is dependency and a function call per row in the hot path.

Optimizations in this change are:
1) Remove the data dependency between computing the hash and the channel
2) Inline DataStreamSender::Channel::AddRow
3) Save partition_exprs_.size() to save a couple of instructions

This translates to improving CPI for DataStreamSender::Send by 10%

Change-Id: I642a9dad531a29d4838a3537ab0e04320a69960d
Reviewed-on: http://gerrit.cloudera.org:8080/9221
Reviewed-by: Mostafa Mokhtar <mm...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bf426527
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bf426527
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bf426527

Branch: refs/heads/2.x
Commit: bf42652748f2dad2fd343574ba205eb88089549c
Parents: 9d7b210
Author: mmokhtar <mm...@cloudera.com>
Authored: Mon Feb 5 17:05:02 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 22 02:52:03 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/data-stream-sender.cc      | 77 ++++++++++++++++++--------
 be/src/runtime/krpc-data-stream-sender.cc | 77 ++++++++++++++++++--------
 be/src/runtime/row-batch.h                |  4 ++
 3 files changed, 110 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/bf426527/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
index c572467..30bf5b6 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -87,7 +87,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
   // Copies a single row into this channel's output buffer and flushes buffer
   // if it reaches capacity.
   // Returns error status if any of the preceding rpcs failed, OK otherwise.
-  Status AddRow(TupleRow* row) WARN_UNUSED_RESULT;
+  Status ALWAYS_INLINE AddRow(TupleRow* row) WARN_UNUSED_RESULT;
 
   // Asynchronously sends a row batch.
   // Returns the status of the most recently finished TransmitData
@@ -243,7 +243,7 @@ void DataStreamSender::Channel::WaitForRpc() {
   }
 }
 
-Status DataStreamSender::Channel::AddRow(TupleRow* row) {
+inline Status DataStreamSender::Channel::AddRow(TupleRow* row) {
   if (batch_->AtCapacity()) {
     // batch_ is full, let's send it; but first wait for an ongoing
     // transmission to finish before modifying thrift_batch_
@@ -443,16 +443,29 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
   } else if (partition_type_ == TPartitionType::KUDU) {
     DCHECK_EQ(partition_expr_evals_.size(), 1);
     int num_channels = channels_.size();
-    for (int i = 0; i < batch->num_rows(); ++i) {
-      TupleRow* row = batch->GetRow(i);
-      int32_t partition =
-          *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
-      if (partition < 0) {
-        // This row doesn't coorespond to a partition, e.g. it's outside the given ranges.
-        partition = next_unknown_partition_;
-        ++next_unknown_partition_;
+    const int num_rows = batch->num_rows();
+    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+    int channel_ids[hash_batch_size];
+
+    for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
+      const int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        int32_t partition =
+            *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
+        if (partition < 0) {
+          // This row doesn't correspond to a partition,
+          //  e.g. it's outside the given ranges.
+          partition = next_unknown_partition_;
+          ++next_unknown_partition_;
+        }
+        channel_ids[i] = partition % num_channels;
+      }
+
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
       }
-      RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(row));
     }
   } else {
     DCHECK(partition_type_ == TPartitionType::HASH_PARTITIONED);
@@ -460,20 +473,36 @@ Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
     // TODO: encapsulate this in an Expr as we've done for Kudu above and remove this case
     // once we have codegen here.
     int num_channels = channels_.size();
-    for (int i = 0; i < batch->num_rows(); ++i) {
-      TupleRow* row = batch->GetRow(i);
-      uint64_t hash_val = EXCHANGE_HASH_SEED;
-      for (int j = 0; j < partition_exprs_.size(); ++j) {
-        ScalarExprEvaluator* eval = partition_expr_evals_[j];
-        void* partition_val = eval->GetValue(row);
-        // We can't use the crc hash function here because it does not result in
-        // uncorrelated hashes with different seeds. Instead we use FastHash.
-        // TODO: fix crc hash/GetHashValue()
-        DCHECK(&(eval->root()) == partition_exprs_[j]);
-        hash_val = RawValue::GetHashValueFastHash(
-            partition_val, partition_exprs_[j]->type(), hash_val);
+    const int num_partition_exprs = partition_exprs_.size();
+    const int num_rows = batch->num_rows();
+    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+    int channel_ids[hash_batch_size];
+
+    // Break the loop into two parts break the data dependency between computing
+    // the hash and calling AddRow()
+    // To keep stack allocation small a RowBatch::HASH_BATCH is used
+    for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
+      int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        uint64_t hash_val = EXCHANGE_HASH_SEED;
+        for (int j = 0; j < num_partition_exprs; ++j) {
+          ScalarExprEvaluator* eval = partition_expr_evals_[j];
+          void* partition_val = eval->GetValue(row);
+          // We can't use the crc hash function here because it does not result in
+          // uncorrelated hashes with different seeds. Instead we use FastHash.
+          // TODO: fix crc hash/GetHashValue()
+          DCHECK(&(eval->root()) == partition_exprs_[j]);
+          hash_val = RawValue::GetHashValueFastHash(
+              partition_val, partition_exprs_[j]->type(), hash_val);
+        }
+        channel_ids[i] = hash_val % num_channels;
+      }
+
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
       }
-      RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
     }
   }
   COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());

http://git-wip-us.apache.org/repos/asf/impala/blob/bf426527/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 4866e4e..6c0ad01 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -129,7 +129,7 @@ class KrpcDataStreamSender::Channel : public CacheLineAligned {
   // it reaches capacity. This call may block if the row batch's capacity is reached
   // and the preceding RPC is still in progress. Returns error status if serialization
   // failed or if the preceding RPC failed. Return OK otherwise.
-  Status AddRow(TupleRow* row);
+  Status ALWAYS_INLINE AddRow(TupleRow* row);
 
   // Shutdowns the channel and frees the row batch allocation. Any in-flight RPC will
   // be cancelled. It's expected that clients normally call FlushAndSendEos() before
@@ -474,7 +474,7 @@ Status KrpcDataStreamSender::Channel::SendCurrentBatch() {
   return Status::OK();
 }
 
-Status KrpcDataStreamSender::Channel::AddRow(TupleRow* row) {
+inline Status KrpcDataStreamSender::Channel::AddRow(TupleRow* row) {
   if (batch_->AtCapacity()) {
     // batch_ is full, let's send it.
     RETURN_IF_ERROR(SendCurrentBatch());
@@ -660,16 +660,29 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
   } else if (partition_type_ == TPartitionType::KUDU) {
     DCHECK_EQ(partition_expr_evals_.size(), 1);
     int num_channels = channels_.size();
-    for (int i = 0; i < batch->num_rows(); ++i) {
-      TupleRow* row = batch->GetRow(i);
-      int32_t partition =
-          *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
-      if (partition < 0) {
-        // This row doesn't correspond to a partition, e.g. it's outside the given ranges.
-        partition = next_unknown_partition_;
-        ++next_unknown_partition_;
+    const int num_rows = batch->num_rows();
+    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+    int channel_ids[hash_batch_size];
+
+    for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
+      int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        int32_t partition =
+            *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
+        if (partition < 0) {
+          // This row doesn't correspond to a partition,
+          // e.g. it's outside the given ranges.
+          partition = next_unknown_partition_;
+          ++next_unknown_partition_;
+        }
+        channel_ids[i] = partition % num_channels;
+      }
+
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
       }
-      RETURN_IF_ERROR(channels_[partition % num_channels]->AddRow(row));
     }
   } else {
     DCHECK_EQ(partition_type_, TPartitionType::HASH_PARTITIONED);
@@ -677,20 +690,36 @@ Status KrpcDataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
     // TODO: encapsulate this in an Expr as we've done for Kudu above and remove this case
     // once we have codegen here.
     int num_channels = channels_.size();
-    for (int i = 0; i < batch->num_rows(); ++i) {
-      TupleRow* row = batch->GetRow(i);
-      uint64_t hash_val = EXCHANGE_HASH_SEED;
-      for (int j = 0; j < partition_exprs_.size(); ++j) {
-        ScalarExprEvaluator* eval = partition_expr_evals_[j];
-        void* partition_val = eval->GetValue(row);
-        // We can't use the crc hash function here because it does not result in
-        // uncorrelated hashes with different seeds. Instead we use FastHash.
-        // TODO: fix crc hash/GetHashValue()
-        DCHECK(&(eval->root()) == partition_exprs_[j]);
-        hash_val = RawValue::GetHashValueFastHash(
-            partition_val, partition_exprs_[j]->type(), hash_val);
+    const int num_partition_exprs = partition_exprs_.size();
+    const int num_rows = batch->num_rows();
+    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
+    int channel_ids[hash_batch_size];
+
+    // Break the loop into two parts break the data dependency between computing
+    // the hash and calling AddRow()
+    // To keep stack allocation small a RowBatch::HASH_BATCH is used
+    for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
+      int batch_window_size = min(num_rows - batch_start, hash_batch_size);
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        uint64_t hash_val = EXCHANGE_HASH_SEED;
+        for (int j = 0; j < num_partition_exprs; ++j) {
+          ScalarExprEvaluator* eval = partition_expr_evals_[j];
+          void* partition_val = eval->GetValue(row);
+          // We can't use the crc hash function here because it does not result in
+          // uncorrelated hashes with different seeds. Instead we use FastHash.
+          // TODO: fix crc hash/GetHashValue()
+          DCHECK(&(eval->root()) == partition_exprs_[j]);
+          hash_val = RawValue::GetHashValueFastHash(
+              partition_val, partition_exprs_[j]->type(), hash_val);
+        }
+        channel_ids[i] = hash_val % num_channels;
+      }
+
+      for (int i = 0; i < batch_window_size; ++i) {
+        TupleRow* row = batch->GetRow(i + batch_start);
+        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
       }
-      RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row));
     }
   }
   COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());

http://git-wip-us.apache.org/repos/asf/impala/blob/bf426527/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index aad5ebe..3bde4d1 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -385,6 +385,10 @@ class RowBatch {
   // in order to leave room for variable-length data.
   static const int FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2;
 
+  // Batch size to compute hash, keep it small to avoid large stack allocations.
+  // 16 provided the same speedup compared to operating over a full batch.
+  static const int HASH_BATCH_SIZE = 16;
+
   /// Allocates a buffer large enough for the fixed-length portion of 'capacity_' rows in
   /// this batch from 'tuple_data_pool_'. 'capacity_' is reduced if the allocation would
   /// exceed FIXED_LEN_BUFFER_LIMIT. Always returns enough space for at least one row.


[4/4] impala git commit: IMPALA-6517: bootstrap_toolchain.py fails to recognize lsb_release output from RHEL OS

Posted by ta...@apache.org.
IMPALA-6517: bootstrap_toolchain.py fails to recognize lsb_release
output from RHEL OS

The OS map that we currently use to check platform/OS release
against in bootstrap_toolchain.py does not contain key-value pairs
for Redhat platforms.
e.g.
lsb_release -irs
RedHatEnterpriseServer 6.9

This change adds RHEL5, RHEL6 and RHEL7 to the OS map. It also
relaxes the matching criteria for RHEL and CentOS to only major
version.

Testing: I manually cloned a repo locally and called
bootstrap_toolchain.py to verify that it can detect the platform.
Testing was done against RHEL6, RHEL7, Ubuntu16.04 and Centos7.

Change-Id: I83874220bd424a452df49520b5dad7bfa2124ca6
Reviewed-on: http://gerrit.cloudera.org:8080/9310
Reviewed-by: Lars Volker <lv...@cloudera.com>
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d89db5b0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d89db5b0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d89db5b0

Branch: refs/heads/2.x
Commit: d89db5b0dc797c82a0c38fe3e563b1cda3603c5a
Parents: 11e0bed
Author: Vincent Tran <vt...@cloudera.com>
Authored: Tue Feb 13 15:58:49 2018 -0500
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 22 06:29:53 2018 +0000

----------------------------------------------------------------------
 bin/bootstrap_toolchain.py | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d89db5b0/bin/bootstrap_toolchain.py
----------------------------------------------------------------------
diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py
index 8494c6c..8cee665 100755
--- a/bin/bootstrap_toolchain.py
+++ b/bin/bootstrap_toolchain.py
@@ -51,6 +51,9 @@ OS_MAPPING = {
   "centos6" : "ec2-package-centos-6",
   "centos5" : "ec2-package-centos-5",
   "centos7" : "ec2-package-centos-7",
+  "redhatenterpriseserver5" :  "ec2-package-centos-5",
+  "redhatenterpriseserver6" :  "ec2-package-centos-6",
+  "redhatenterpriseserver7" :  "ec2-package-centos-7",
   "debian6" : "ec2-package-debian-6",
   "debian7" : "ec2-package-debian-7",
   "debian8" : "ec2-package-debian-8",
@@ -107,6 +110,11 @@ def get_platform_release_label(release=None):
       release = lsb_release_cache
     else:
       release = "".join(map(lambda x: x.lower(), sh.lsb_release("-irs").split()))
+      # Only need to check against the major release if RHEL or CentOS
+      for platform in ['centos', 'redhatenterpriseserver']:
+        if platform in release:
+          release = release.split('.')[0]
+          break
       lsb_release_cache = release
   for k, v in OS_MAPPING.iteritems():
     if re.search(k, release):


[2/4] impala git commit: IMPALA-6538: Fix read path when Parquet min/max statistics contain NaN

Posted by ta...@apache.org.
IMPALA-6538: Fix read path when Parquet min/max statistics contain NaN

If the first number in a row group written by Impala is NaN,
then Impala writes incorrect statistics in the metadata.
This will result in incorrect results when filtering the
data.

This commit fixes the read path when encountering NaNs in
Parquet min/max statistics. If min and max are both NaN, we
can't use the statistics at all. If only one of them is NaN,
the other still can be used.

I added some tests to QueryTest/parqet-stats.test

Change-Id: If3897fc1426541239223670812f59e2bed32f455
Reviewed-on: http://gerrit.cloudera.org:8080/9358
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/38461775
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/38461775
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/38461775

Branch: refs/heads/2.x
Commit: 384617750401e7604b2e34fd6c809d72f810d0fb
Parents: bf42652
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Mon Feb 19 15:26:42 2018 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 22 02:52:03 2018 +0000

----------------------------------------------------------------------
 be/src/exec/parquet-column-stats.cc             |   7 +-
 testdata/data/README                            |   7 ++
 testdata/data/min_max_is_nan.parquet            | Bin 0 -> 331 bytes
 .../QueryTest/parquet-invalid-minmax-stats.test |  32 ++++++++
 .../queries/QueryTest/parquet-stats.test        |  77 +++++++++++++++++++
 tests/query_test/test_parquet_stats.py          |  16 ++++
 6 files changed, 137 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/38461775/be/src/exec/parquet-column-stats.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.cc b/be/src/exec/parquet-column-stats.cc
index a1d1155..2f6f7fc 100644
--- a/be/src/exec/parquet-column-stats.cc
+++ b/be/src/exec/parquet-column-stats.cc
@@ -18,6 +18,7 @@
 #include "parquet-column-stats.inline.h"
 
 #include <algorithm>
+#include <cmath>
 #include <limits>
 
 #include "common/names.h"
@@ -94,11 +95,13 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
       return ColumnStats<int64_t>::DecodePlainValue(*stat_value, slot,
           col_chunk.meta_data.type);
     case TYPE_FLOAT:
+      // IMPALA-6527, IMPALA-6538: ignore min/max stats if NaN
       return ColumnStats<float>::DecodePlainValue(*stat_value, slot,
-          col_chunk.meta_data.type);
+          col_chunk.meta_data.type) && !std::isnan(*reinterpret_cast<float*>(slot));
     case TYPE_DOUBLE:
+      // IMPALA-6527, IMPALA-6538: ignore min/max stats if NaN
       return ColumnStats<double>::DecodePlainValue(*stat_value, slot,
-          col_chunk.meta_data.type);
+          col_chunk.meta_data.type) && !std::isnan(*reinterpret_cast<double*>(slot));
     case TYPE_TIMESTAMP:
       return ColumnStats<TimestampValue>::DecodePlainValue(*stat_value, slot,
           col_chunk.meta_data.type);

http://git-wip-us.apache.org/repos/asf/impala/blob/38461775/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index 2b92a0a..e293c3c 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -148,3 +148,10 @@ The file has the following schema:
     optional int32 int_col;
     optional int64 bigint_col;
   }
+
+min_max_is_nan.parquet:
+Generated by Impala's Parquet writer before the fix for IMPALA-6527. Git hash: 3a049a53
+Created to test the read path for a Parquet file with invalid metadata, namely when
+'max_value' and 'min_value' are both NaN. Contains 2 single-column rows:
+NaN
+42

http://git-wip-us.apache.org/repos/asf/impala/blob/38461775/testdata/data/min_max_is_nan.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/min_max_is_nan.parquet b/testdata/data/min_max_is_nan.parquet
new file mode 100644
index 0000000..7fedf3a
Binary files /dev/null and b/testdata/data/min_max_is_nan.parquet differ

http://git-wip-us.apache.org/repos/asf/impala/blob/38461775/testdata/workloads/functional-query/queries/QueryTest/parquet-invalid-minmax-stats.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-invalid-minmax-stats.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-invalid-minmax-stats.test
new file mode 100644
index 0000000..c2044ae
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-invalid-minmax-stats.test
@@ -0,0 +1,32 @@
+====
+---- QUERY
+# IMPALA-6527, IMPALA-6538: NaN values lead to incorrect filtering.
+# When the first value is NaN in a column chunk, Impala might choose it as min_value and
+# max_value for statistics. In this case the min/max filter should be ignored.
+# 'min_max_is_nan' is written by an old writer, therefore it contains invalid statistics.
+select * from min_max_is_nan where val > 0
+---- RESULTS
+42
+====
+---- QUERY
+# IMPALA-6527, IMPALA-6538: NaN values lead to incorrect filtering
+# test equality predicate
+select * from min_max_is_nan where val = 42
+---- RESULTS
+42
+====
+---- QUERY
+# IMPALA-6527: NaN values lead to incorrect filtering
+# test predicate that is true for NaN
+select * from min_max_is_nan where not val >= 0
+---- RESULTS
+NaN
+====
+---- QUERY
+# IMPALA-6527: NaN values lead to incorrect filtering
+# test predicate that is true for NaN
+select * from min_max_is_nan where val != 0
+---- RESULTS
+NaN
+42
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/38461775/testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test
index 70b5f27..273dff8 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test
@@ -491,3 +491,80 @@ select count(*) from table_for_null_count_test2 where j < 3;
 aggregation(SUM, NumRowGroups): 1
 aggregation(SUM, NumStatsFilteredRowGroups): 0
 ====
+---- QUERY
+# IMPALA-6527, IMPALA-6538: NaN values lead to incorrect filtering.
+# When the first value is NaN in a column chunk, Impala chooses it as min_value and
+# max_value for statistics. In this case the min/max filter should be ignored.
+create table test_nan(val double) stored as parquet;
+insert into test_nan values (cast('NaN' as double)), (42);
+select * from test_nan where val > 0
+---- RESULTS
+42
+====
+---- QUERY
+# IMPALA-6527, IMPALA-6538: NaN values lead to incorrect filtering
+# test equality predicate
+select * from test_nan where val = 42
+---- RESULTS
+42
+====
+---- QUERY
+# IMPALA-6527: NaN values lead to incorrect filtering
+# test predicate that is true for NaN
+select * from test_nan where not val >= 0
+---- RESULTS
+NaN
+====
+---- QUERY
+# IMPALA-6527: NaN values lead to incorrect filtering
+# test predicate that is true for NaN
+select * from test_nan where val != 0
+---- RESULTS
+NaN
+42
+====
+---- QUERY
+# Statistics filtering must not filter out row groups if predicate can be true for NaN
+create table test_nan_true_predicate(val double) stored as parquet;
+insert into test_nan_true_predicate values (10), (20), (cast('NaN' as double));
+select * from test_nan_true_predicate where not val >= 0
+---- RESULTS
+NaN
+====
+---- QUERY
+# NaN is the last element, predicate is true for NaN and value
+select * from test_nan_true_predicate where not val >= 20
+---- RESULTS
+10
+NaN
+====
+---- QUERY
+# NaN is the last element, predicate is true for NaN and value
+select * from test_nan_true_predicate where val != 10
+---- RESULTS
+20
+NaN
+====
+---- QUERY
+# Test the case when NaN is inserted between two values
+# Test predicate true for NaN and false for the values
+create table test_nan_in_the_middle(val double) stored as parquet;
+insert into test_nan_in_the_middle values (10), (cast('NaN' as double)), (20);
+select * from test_nan_in_the_middle where not val >= 0
+---- RESULTS
+NaN
+====
+---- QUERY
+# NaN in the middle, predicate true for NaN and value
+select * from test_nan_in_the_middle where not val >= 20
+---- RESULTS
+10
+NaN
+====
+---- QUERY
+# NaN in the middle, '!=' should return NaN and value
+select * from test_nan_in_the_middle where val != 10
+---- RESULTS
+NaN
+20
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/38461775/tests/query_test/test_parquet_stats.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_parquet_stats.py b/tests/query_test/test_parquet_stats.py
index 00afd09..3f8cd2f 100644
--- a/tests/query_test/test_parquet_stats.py
+++ b/tests/query_test/test_parquet_stats.py
@@ -69,3 +69,19 @@ class TestParquetStats(ImpalaTestSuite):
     # skipped inside a fragment, so we ensure that the tests run in a single fragment.
     vector.get_value('exec_option')['num_nodes'] = 1
     self.run_test_case('QueryTest/parquet-deprecated-stats', vector, unique_database)
+
+  def test_invalid_stats(self, vector, unique_database):
+    """IMPALA-6538" Test that reading parquet files with statistics with invalid
+    'min_value'/'max_value' fields works correctly. 'min_value' and 'max_value' are both
+    NaNs, therefore we need to ignore them"""
+    table_name = 'min_max_is_nan'
+    self.client.execute('create table %s.%s (val double) stored as parquet' %
+                       (unique_database, table_name))
+    table_location = get_fs_path('/test-warehouse/%s.db/%s' %
+                                 (unique_database, table_name))
+    local_file = os.path.join(os.environ['IMPALA_HOME'],
+                              'testdata/data/min_max_is_nan.parquet')
+    assert os.path.isfile(local_file)
+    check_call(['hdfs', 'dfs', '-copyFromLocal', local_file, table_location])
+    self.client.execute('invalidate metadata %s.%s' % (unique_database, table_name))
+    self.run_test_case('QueryTest/parquet-invalid-minmax-stats', vector, unique_database)
\ No newline at end of file