You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/04/09 15:45:06 UTC

[impala] branch master updated (e0ed7d3 -> 5989900)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from e0ed7d3  IMPALA-8632: Add support for self-event detection for insert events
     new 5e69ae1  IMPALA-9612: Fix race condition in RuntimeFilter::WaitForArrival
     new 5989900  IMPALA-9618: fix some usability issues with dev env

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/runtime/CMakeLists.txt         |   2 +
 be/src/runtime/runtime-filter-test.cc | 103 +++++++++++++++++++++++++++++++++
 be/src/runtime/runtime-filter.cc      |  33 +++++++----
 be/src/runtime/runtime-filter.h       |   7 +++
 bin/bootstrap_system.sh               |   3 +-
 bin/impala-config.sh                  |   4 +-
 buildall.sh                           |   4 ++
 testdata/bin/cache_tables.py          | 105 ----------------------------------
 testdata/cluster/admin                |  23 ++++----
 tests/comparison/cluster.py           |   5 +-
 10 files changed, 154 insertions(+), 135 deletions(-)
 create mode 100644 be/src/runtime/runtime-filter-test.cc
 delete mode 100755 testdata/bin/cache_tables.py


[impala] 01/02: IMPALA-9612: Fix race condition in RuntimeFilter::WaitForArrival

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5e69ae1d7dc113bbcc8d7d75e3b1b5244e76f76a
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Mon Apr 6 22:24:05 2020 -0700

    IMPALA-9612: Fix race condition in RuntimeFilter::WaitForArrival
    
    In function RuntimeFilter::WaitForArrival, there is a race condition
    where condition variable arrival_cv_ may be signaled right after
    thread get into the loop and before it call arrival_cv_.WaitFor().
    This can cause runtime filter to wait the entire
    RUNTIME_FILTER_WAIT_TIME_MS even though the filter has arrived or
    canceled earlier than that. This commit avoid the race condition by
    making RuntimeFilter::SetFilter and RuntimeFilter::Cancel acquire
    arrival_mutex_ first before checking the value of arrival_time_ and
    release arrival_mutex_ before signaling arrival_cv_.
    
    Testing:
    - Add new be test runtime-filter-test.cc
    - Pass core tests.
    
    Change-Id: I7dffa626103ef0af06ad1e89231b0d2ee54bb94a
    Reviewed-on: http://gerrit.cloudera.org:8080/15673
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/CMakeLists.txt         |   2 +
 be/src/runtime/runtime-filter-test.cc | 103 ++++++++++++++++++++++++++++++++++
 be/src/runtime/runtime-filter.cc      |  33 +++++++----
 be/src/runtime/runtime-filter.h       |   7 +++
 4 files changed, 133 insertions(+), 12 deletions(-)

diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 6990421..12e279e 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -98,6 +98,7 @@ add_library(RuntimeTests STATIC
   multi-precision-test.cc
   raw-value-test.cc
   row-batch-serialize-test.cc
+  runtime-filter-test.cc
   string-buffer-test.cc
   string-compare-test.cc
   string-search-test.cc
@@ -131,6 +132,7 @@ ADD_UNIFIED_BE_LSAN_TEST(hdfs-fs-cache-test "HdfsFsCacheTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(tmp-file-mgr-test "TmpFileMgrTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(row-batch-serialize-test "RowBatchSerializeTest.*")
 # Exception to unified be tests: Custom main function with global Frontend object
+ADD_UNIFIED_BE_LSAN_TEST(runtime-filter-test "RuntimeFilterTest.*")
 ADD_BE_LSAN_TEST(row-batch-test)
 # Exception to unified be tests: Custom main function with global Frontend object
 ADD_BE_LSAN_TEST(collection-value-builder-test)
diff --git a/be/src/runtime/runtime-filter-test.cc b/be/src/runtime/runtime-filter-test.cc
new file mode 100644
index 0000000..76680a5
--- /dev/null
+++ b/be/src/runtime/runtime-filter-test.cc
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/thread/thread.hpp>
+
+#include "common/init.h"
+#include "common/object-pool.h"
+#include "runtime/runtime-filter.h"
+#include "runtime/runtime-filter.inline.h"
+#include "testutil/gtest-util.h"
+#include "util/stopwatch.h"
+
+#include "common/names.h"
+
+using namespace impala;
+
+namespace impala {
+
+class RuntimeFilterTest : public testing::Test {
+ protected:
+  ObjectPool pool_;
+  MemTracker tracker_;
+
+  virtual void SetUp() {}
+
+  virtual void TearDown() { pool_.Clear(); }
+
+  void SetDelay(RuntimeFilter* rf, int64_t delay) { rf->injection_delay_ = delay; }
+};
+
+struct TestConfig {
+  RuntimeFilter* runtime_filter;
+  int64_t injection_delay;
+  int64_t wait_for_ms;
+  MinMaxFilter* min_max_filter;
+};
+
+// Test that RuntimeFilter stop waiting after it is canceled.
+// See IMPALA-9612.
+TEST_F(RuntimeFilterTest, Canceled) {
+  TRuntimeFilterDesc desc;
+  desc.__set_type(TRuntimeFilterType::MIN_MAX);
+  RuntimeFilter* rf = pool_.Add(new RuntimeFilter(desc, desc.filter_size_bytes));
+  TestConfig tc = {rf, 500, 1000, nullptr};
+
+  SetDelay(rf, tc.injection_delay);
+  MonotonicStopWatch sw;
+  thread_group workers;
+
+  sw.Start();
+  workers.add_thread(
+      new thread([&tc] { tc.runtime_filter->WaitForArrival(tc.wait_for_ms); }));
+  SleepForMs(100); // give waiting thread a head start
+  workers.add_thread(new thread([&tc] { tc.runtime_filter->Cancel(); }));
+  workers.join_all();
+  sw.Stop();
+
+  ASSERT_GE(tc.runtime_filter->arrival_delay_ms(), tc.injection_delay);
+  ASSERT_LT(sw.ElapsedTime(), (tc.injection_delay + tc.wait_for_ms) * 1000000);
+}
+
+// Test that RuntimeFilter stop waiting after the filter arrived.
+// See IMPALA-9612.
+TEST_F(RuntimeFilterTest, Arrived) {
+  TRuntimeFilterDesc desc;
+  desc.__set_type(TRuntimeFilterType::MIN_MAX);
+  RuntimeFilter* rf = pool_.Add(new RuntimeFilter(desc, desc.filter_size_bytes));
+  MinMaxFilter* mmf =
+      MinMaxFilter::Create(ColumnType(PrimitiveType::TYPE_BOOLEAN), &pool_, &tracker_);
+  TestConfig tc = {rf, 500, 1000, mmf};
+
+  SetDelay(rf, tc.injection_delay);
+  MonotonicStopWatch sw;
+  thread_group workers;
+
+  sw.Start();
+  workers.add_thread(
+      new thread([&tc] { tc.runtime_filter->WaitForArrival(tc.wait_for_ms); }));
+  SleepForMs(100); // give waiting thread a head start
+  workers.add_thread(
+      new thread([&tc] { tc.runtime_filter->SetFilter(nullptr, tc.min_max_filter); }));
+  workers.join_all();
+  sw.Stop();
+
+  ASSERT_GE(tc.runtime_filter->arrival_delay_ms(), tc.injection_delay);
+  ASSERT_LT(sw.ElapsedTime(), (tc.injection_delay + tc.wait_for_ms) * 1000000);
+}
+
+} // namespace impala
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index 4b013a6..1ff0a13 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -26,17 +26,20 @@ using namespace impala;
 const char* RuntimeFilter::LLVM_CLASS_NAME = "class.impala::RuntimeFilter";
 
 void RuntimeFilter::SetFilter(BloomFilter* bloom_filter, MinMaxFilter* min_max_filter) {
-  DCHECK(!HasFilter()) << "SetFilter() should not be called multiple times.";
-  DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr);
-  if (arrival_time_.Load() != 0) return; // The filter may already have been cancelled.
-  if (is_bloom_filter()) {
-    bloom_filter_.Store(bloom_filter);
-  } else {
-    DCHECK(is_min_max_filter());
-    min_max_filter_.Store(min_max_filter);
+  {
+    unique_lock<mutex> l(arrival_mutex_);
+    DCHECK(!HasFilter()) << "SetFilter() should not be called multiple times.";
+    DCHECK(bloom_filter_.Load() == nullptr && min_max_filter_.Load() == nullptr);
+    if (arrival_time_.Load() != 0) return; // The filter may already have been cancelled.
+    if (is_bloom_filter()) {
+      bloom_filter_.Store(bloom_filter);
+    } else {
+      DCHECK(is_min_max_filter());
+      min_max_filter_.Store(min_max_filter);
+    }
+    arrival_time_.Store(MonotonicMillis());
+    has_filter_.Store(true);
   }
-  arrival_time_.Store(MonotonicMillis());
-  has_filter_.Store(true);
   arrival_cv_.NotifyAll();
 }
 
@@ -64,8 +67,11 @@ void RuntimeFilter::Or(RuntimeFilter* other) {
 }
 
 void RuntimeFilter::Cancel() {
-  if (arrival_time_.Load() != 0) return;
-  arrival_time_.Store(MonotonicMillis());
+  {
+    unique_lock<mutex> l(arrival_mutex_);
+    if (arrival_time_.Load() != 0) return;
+    arrival_time_.Store(MonotonicMillis());
+  }
   arrival_cv_.NotifyAll();
 }
 
@@ -75,6 +81,9 @@ bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const {
     int64_t ms_since_registration = MonotonicMillis() - registration_time_;
     int64_t ms_remaining = timeout_ms - ms_since_registration;
     if (ms_remaining <= 0) break;
+#ifndef NDEBUG
+    if (injection_delay_ > 0) SleepForMs(injection_delay_);
+#endif
     arrival_cv_.WaitFor(l, ms_remaining * MICROS_PER_MILLI);
   }
   return arrival_time_.Load() != 0;
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 5108f83..e2a4a67 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -28,6 +28,7 @@
 namespace impala {
 
 class BloomFilter;
+class RuntimeFilterTest;
 
 /// RuntimeFilters represent set-membership predicates that are computed during query
 /// execution (rather than during planning). They can then be sent to other operators to
@@ -117,6 +118,8 @@ class RuntimeFilter {
   static const char* LLVM_CLASS_NAME;
 
  private:
+  friend class RuntimeFilterTest;
+
   /// Membership bloom_filter. May be NULL even after arrival_time_ is set, meaning that
   /// it does not filter any rows, either because it was not created
   /// (filter_desc_.bloom_filter is false), there was not enough memory, or the false
@@ -148,5 +151,9 @@ class RuntimeFilter {
   /// Signalled when a filter arrives or the filter is cancelled. Paired with
   /// 'arrival_mutex_'
   mutable ConditionVariable arrival_cv_;
+
+  /// Injection delay for WaitForArrival. Used in testing only.
+  /// See IMPALA-9612.
+  int64_t injection_delay_ = 0;
 };
 }


[impala] 02/02: IMPALA-9618: fix some usability issues with dev env

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5989900ae81a98d6977bdd60f2281da47e9f69b7
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Tue Apr 7 17:08:00 2020 -0500

    IMPALA-9618: fix some usability issues with dev env
    
    Automatically assume IMPALA_HOME is the source directory
    in a couple of places.
    
    Delete the cache_tables.py script and MINI_DFS_BASE_DATA_DIR
    config var which had both bit-rotted and were unused.
    
    Allow setting IMPALA_CLUSTER_NODES_DIR to put the minicluster
    nodes, most important the data, in a different location, e.g.
    on a different filesystem.
    
    Testing:
    I set up a dev environment using this code and was able to
    load data and run some tests.
    
    Change-Id: Ibd8b42a6d045d73e3ea29015aa6ccbbde278eec7
    Reviewed-on: http://gerrit.cloudera.org:8080/15687
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/bootstrap_system.sh      |   3 +-
 bin/impala-config.sh         |   4 +-
 buildall.sh                  |   4 ++
 testdata/bin/cache_tables.py | 105 -------------------------------------------
 testdata/cluster/admin       |  23 +++++-----
 tests/comparison/cluster.py  |   5 +--
 6 files changed, 21 insertions(+), 123 deletions(-)

diff --git a/bin/bootstrap_system.sh b/bin/bootstrap_system.sh
index 115a9eb..3f32a5e 100755
--- a/bin/bootstrap_system.sh
+++ b/bin/bootstrap_system.sh
@@ -44,7 +44,8 @@
 
 set -eu -o pipefail
 
-: ${IMPALA_HOME:=~/Impala}
+: ${IMPALA_HOME:=$(cd "$(dirname $0)"/..; pwd)}
+export IMPALA_HOME
 
 if [[ -t 1 ]] # if on an interactive terminal
 then
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index be6d025..66bf94b 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -366,6 +366,7 @@ export EXTERNAL_LISTEN_HOST="${EXTERNAL_LISTEN_HOST-0.0.0.0}"
 export DEFAULT_FS="${DEFAULT_FS-hdfs://${INTERNAL_LISTEN_HOST}:20500}"
 export WAREHOUSE_LOCATION_PREFIX="${WAREHOUSE_LOCATION_PREFIX-}"
 export LOCAL_FS="file:${WAREHOUSE_LOCATION_PREFIX}"
+export IMPALA_CLUSTER_NODES_DIR="${IMPALA_CLUSTER_NODES_DIR-$IMPALA_HOME/testdata/cluster/cdh$CDH_MAJOR_VERSION}"
 
 ESCAPED_IMPALA_HOME=$(sed "s/[^0-9a-zA-Z]/_/g" <<< "$IMPALA_HOME")
 if $USE_CDP_HIVE; then
@@ -612,7 +613,6 @@ HADOOP_CLASSPATH="$LZO_JAR_PATH"
 # minicluster.
 HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:${HADOOP_HOME}/share/hadoop/tools/lib/*"
 
-export MINI_DFS_BASE_DATA_DIR="$IMPALA_HOME/cdh-${CDH_MAJOR_VERSION}-hdfs-data"
 export PATH="$HADOOP_HOME/bin:$PATH"
 
 export SENTRY_HOME="$CDH_COMPONENTS_HOME/sentry-${IMPALA_SENTRY_VERSION}"
@@ -802,7 +802,7 @@ echo "HADOOP_HOME             = $HADOOP_HOME"
 echo "HADOOP_CONF_DIR         = $HADOOP_CONF_DIR"
 echo "HADOOP_INCLUDE_DIR      = $HADOOP_INCLUDE_DIR"
 echo "HADOOP_LIB_DIR          = $HADOOP_LIB_DIR"
-echo "MINI_DFS_BASE_DATA_DIR  = $MINI_DFS_BASE_DATA_DIR"
+echo "IMPALA_CLUSTER_NODES_DIR= $IMPALA_CLUSTER_NODES_DIR"
 echo "HIVE_HOME               = $HIVE_HOME"
 echo "HIVE_CONF_DIR           = $HIVE_CONF_DIR"
 echo "HIVE_SRC_DIR            = $HIVE_SRC_DIR"
diff --git a/buildall.sh b/buildall.sh
index 08dfa3b..a697c20 100755
--- a/buildall.sh
+++ b/buildall.sh
@@ -18,6 +18,10 @@
 # under the License.
 
 set -euo pipefail
+
+: ${IMPALA_HOME:=$(cd "$(dirname $0)"; pwd)}
+export IMPALA_HOME
+
 . $IMPALA_HOME/bin/report_build_error.sh
 setup_report_build_error
 
diff --git a/testdata/bin/cache_tables.py b/testdata/bin/cache_tables.py
deleted file mode 100755
index 36eb0df..0000000
--- a/testdata/bin/cache_tables.py
+++ /dev/null
@@ -1,105 +0,0 @@
-#!/usr/bin/env impala-python
-##############################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-##############################################################################
-#
-# This script will warm up the buffer cache with the tables required to run the input
-# query.  This only works on a mini-dfs cluster.  This is remarkably difficult to do
-# since hdfs which tries to hide the details of the block locations from users.
-# The only way to do this is to
-#   1. use the java APIs (deprecated, of course) to extract the block ids.
-#   2. find the files with those block ids on the file system and read them
-#
-# First run testdata/bin/generate-block-ids.sh.  This will output the block locations
-# to testdata/block-ids.  This file is good as long as the mini-dfs cluster does not
-# get new files.  If the block-ids file is not there, this script will run
-# generate-block-ids.sh.
-#
-# Run this script, passing it the query and it will go read every replica of every
-# block of every table in the query.
-import math
-import os
-import re
-import sys
-import subprocess
-import tempfile
-from optparse import OptionParser
-
-# Options
-parser = OptionParser()
-parser.add_option("-q", "--query", dest="query", default = "",
-                  help="Query to run.  If none specified, runs all queries.")
-
-(options, args) = parser.parse_args()
-
-block_ids_file = 'testdata/block-ids'
-data_node_root = os.environ['MINI_DFS_BASE_DATA_DIR'] + '/dfs/data'
-block_ids = {}
-
-# Parse the block ids file to all the block ids for all the tables
-# the format of the file is:
-# <table name>: <block_id1> <block_id2> <etc>
-def parse_block_ids():
-  full_path = os.environ['IMPALA_HOME'] + "/" + block_ids_file;
-  if not os.path.isfile(full_path):
-    cmd = os.environ['IMPALA_HOME'] + '/testdata/bin/generate-block-ids.sh'
-    os.system(cmd)
-
-  if not os.path.isfile(full_path):
-    raise Exception("Could not find/generate block id files: " + full_path)
-
-  f = open(full_path);
-  for line in f:
-    tokens = line.split(':')
-    blocks = tokens[1].strip().split(' ')
-    block_ids[tokens[0].strip()] = blocks
-  
-# Parse for the tables used in this query
-def parse_tables(query):
-  table_predecessor = ['from', 'join']
-  tokens = query.split(' ')
-  tables = []
-  next_is_table = False
-  for t in tokens:
-    t = t.lower()
-    if next_is_table:
-      tables.append(t)
-      next_is_table = False
-    if t in table_predecessor:
-      next_is_table = True
-  return tables
-
-# Warm the buffer cache by cat-ing all the blocks to /dev/null
-def warm_buffer_cache(table):
-  if table not in block_ids:
-    raise Exception("Table not found: " + table)
-
-  blocks = block_ids[table]
-  for block in blocks:
-    cmd = 'find %s -type f -name blk_%s* -exec cat {} > /dev/null \;' % \
-          (data_node_root, block)
-    os.system(cmd)
-
-tables = parse_tables(options.query)
-parse_block_ids()
-
-if len(tables) == 0:
-  raise Exception("Could not parse tables in: " + options.query)
-
-for table in tables:
-  warm_buffer_cache(table)
diff --git a/testdata/cluster/admin b/testdata/cluster/admin
index dee8dbf..d777133 100755
--- a/testdata/cluster/admin
+++ b/testdata/cluster/admin
@@ -48,7 +48,6 @@ done
 shift $(($OPTIND-1))
 
 DIR=$(dirname $0)
-NODES_DIR="$DIR/cdh$CDH_MAJOR_VERSION"
 NODE_COUNT=3
 if [[ "$TARGET_FILESYSTEM" == "hdfs" && "$ERASURE_CODING" = true ]]; then
   NODE_COUNT=5
@@ -191,13 +190,13 @@ function is_kerberized {
 
 function cluster_exists {
   # Just use the first node as an indicator...
-  if [[ ! -e "$NODES_DIR/${NODE_PREFIX}1" ]]; then
+  if [[ ! -e "$IMPALA_CLUSTER_NODES_DIR/${NODE_PREFIX}1" ]]; then
     return 1
   fi
 }
 
 function create_cluster {
-  mkdir -p "$NODES_DIR"
+  mkdir -p "$IMPALA_CLUSTER_NODES_DIR"
 
   # Used to populate config templates later
   GROUP=$(id -gn)
@@ -384,7 +383,7 @@ function exec_init_script {
   local CMD="$1"
 
   local PIDS=()
-  for SCRIPT in $(find "$NODES_DIR" -path "*/$NODE_PREFIX*/etc/init.d/$SCRIPT_NAME" \
+  for SCRIPT in $(find "$IMPALA_CLUSTER_NODES_DIR" -path "*/$NODE_PREFIX*/etc/init.d/$SCRIPT_NAME" \
       $FIND_EXECUTABLE_FILTER -type f); do
     if "$SCRIPT" status &>/dev/null; then
       RUNNING=true
@@ -419,7 +418,7 @@ function check_cluster_status {
 
   ROLE_COUNT=0
   NOT_RUNNING=()
-  for NODE_DIR in "$NODES_DIR/$NODE_PREFIX"*; do
+  for NODE_DIR in "$IMPALA_CLUSTER_NODES_DIR/$NODE_PREFIX"*; do
     for SERVICE in ${SUPPORTED_SERVICES[@]-}; do
       for SCRIPT in $(find "$NODE_DIR" -path "*/etc/init.d/$SERVICE*" $FIND_EXECUTABLE_FILTER \
           -type f); do
@@ -472,30 +471,30 @@ function restart {
 
 function delete_data {
   # Delete namenode, datanode and KMS data while preserving directory structure.
-  rm -rf "$NODES_DIR/$NODE_PREFIX"*/data/dfs/{nn,dn}/*
-  rm -f "$NODES_DIR/$NODE_PREFIX"*/data/kms.keystore
+  rm -rf "$IMPALA_CLUSTER_NODES_DIR/$NODE_PREFIX"*/data/dfs/{nn,dn}/*
+  rm -f "$IMPALA_CLUSTER_NODES_DIR/$NODE_PREFIX"*/data/kms.keystore
   delete_kudu_data
 }
 
 function delete_kudu_data {
-  rm -rf "$NODES_DIR/$NODE_PREFIX"*/var/lib/kudu/{master,ts}/*
+  rm -rf "$IMPALA_CLUSTER_NODES_DIR/$NODE_PREFIX"*/var/lib/kudu/{master,ts}/*
 }
 
 function delete_cluster {
   pkill -u $USER -f $KILL_CLUSTER_MARKER || true
-  rm -rf "$NODES_DIR"
+  rm -rf "$IMPALA_CLUSTER_NODES_DIR"
 }
 
 function get_node_dir {
   if $IS_OSX; then
-    greadlink -f "$NODES_DIR/$1"
+    greadlink -f "$IMPALA_CLUSTER_NODES_DIR/$1"
   else
-    readlink -f "$NODES_DIR/$1"
+    readlink -f "$IMPALA_CLUSTER_NODES_DIR/$1"
   fi
 }
 
 function get_hadoop_client_conf_dir {
-  echo "$NODES_DIR/$NODE_PREFIX"1/etc/hadoop/conf
+  echo "$IMPALA_CLUSTER_NODES_DIR/$NODE_PREFIX"1/etc/hadoop/conf
 }
 
 COMMAND=$1
diff --git a/tests/comparison/cluster.py b/tests/comparison/cluster.py
index 4951d51..6f850e2 100644
--- a/tests/comparison/cluster.py
+++ b/tests/comparison/cluster.py
@@ -209,9 +209,8 @@ class MiniCluster(Cluster):
       shutil.copy(os.path.join(other_conf_dir, file_name), self._local_hadoop_conf_dir)
 
   def _get_node_conf_dir(self):
-    return os.path.join(os.environ["IMPALA_HOME"], "testdata", "cluster",
-                        "cdh%s" % os.environ["CDH_MAJOR_VERSION"], "node-1",
-                        "etc", "hadoop", "conf")
+    return os.path.join(os.environ["IMPALA_CLUSTER_NODES_DIR"],
+                        "node-1", "etc", "hadoop", "conf")
 
   def _get_other_conf_dir(self):
     return os.path.join(os.environ["IMPALA_HOME"], "fe", "src", "test",