You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/01/05 15:32:14 UTC

[1/5] incubator-impala git commit: IMPALA-4689: Fix computation of last active time

Repository: incubator-impala
Updated Branches:
  refs/heads/master 756231880 -> 6d15f0377


IMPALA-4689: Fix computation of last active time

The last active time in impala-server.cc#L1806 is in milliseconds, but
the TimestampValue c'tor expects seconds. This change also renames some
variables to make their meaning more explicit, aiming to prevent similar
bugs in the future.

This change also fixes a bug that occurred when during startup of the
local minicluster the operating system PIDs would wrap around. This way
the first impalad would not be the one with the smallest PID and
ImpalaCluster.get_first_impalad() would return the wrong one.

I ran git-clang-format on the change.

Change-Id: I283564c8d8e145d44d9493f4201555d3a1087edf
Reviewed-on: http://gerrit.cloudera.org:8080/5546
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/25ebf586
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/25ebf586
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/25ebf586

Branch: refs/heads/master
Commit: 25ebf586e00adc058412fd083b5c4768fa16cc7d
Parents: 7562318
Author: Lars Volker <lv...@cloudera.com>
Authored: Fri Dec 16 17:04:04 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Jan 4 12:12:04 2017 +0000

----------------------------------------------------------------------
 be/src/service/impala-http-handler.cc         |  2 +-
 be/src/service/impala-server.cc               | 11 ++++++-----
 be/src/service/impala-server.h                |  2 +-
 be/src/service/query-exec-state.cc            | 14 +++++++-------
 be/src/service/query-exec-state.h             | 18 ++++++++++--------
 tests/common/impala_cluster.py                |  6 +++++-
 tests/custom_cluster/test_query_expiration.py |  8 +++++---
 7 files changed, 35 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index a324b1f..2fb52e7 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -346,7 +346,7 @@ void ImpalaHttpHandler::QueryStateToJson(const ImpalaServer::QueryStateRecord& r
   value->AddMember("waiting", waiting, document->GetAllocator());
   value->AddMember("executing", !waiting, document->GetAllocator());
 
-  int64_t waiting_time = impala::UnixMillis() - record.last_active_time;
+  int64_t waiting_time = impala::UnixMillis() - record.last_active_time_ms;
   string waiting_time_str = "";
   if (waiting_time > 0) {
     waiting_time_str = PrettyPrinter::Print(waiting_time, TUnit::TIME_MS);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index e159fa2..722baca 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1604,7 +1604,7 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_stat
         plan_exec_info.fragments.begin(), plan_exec_info.fragments.end());
   }
   all_rows_returned = exec_state.eos();
-  last_active_time = exec_state.last_active();
+  last_active_time_ms = exec_state.last_active_ms();
   request_pool = exec_state.request_pool();
 }
 
@@ -1784,7 +1784,7 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
           // Use a non-zero timeout, if one exists
           timeout_s = max(FLAGS_idle_query_timeout, timeout_s);
         }
-        int64_t expiration = query_state->last_active() + (timeout_s * 1000L);
+        int64_t expiration = query_state->last_active_ms() + (timeout_s * 1000L);
         if (now < expiration) {
           // If the real expiration date is in the future we may need to re-insert the
           // query's expiration event at its correct location.
@@ -1801,9 +1801,10 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
           }
         } else if (!query_state->is_active()) {
           // Otherwise time to expire this query
-          VLOG_QUERY << "Expiring query due to client inactivity: "
-                     << expiration_event->second << ", last activity was at: "
-                     << TimestampValue(query_state->last_active()).DebugString();
+          VLOG_QUERY
+              << "Expiring query due to client inactivity: " << expiration_event->second
+              << ", last activity was at: "
+              << TimestampValue(query_state->last_active_ms() / 1000).DebugString();
           const string& err_msg = Substitute(
               "Query $0 expired due to client inactivity (timeout is $1)",
               PrintId(expiration_event->second),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 0697357..588a5c3 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -494,7 +494,7 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
     bool all_rows_returned;
 
     // The most recent time this query was actively being processed, in Unix milliseconds.
-    int64_t last_active_time;
+    int64_t last_active_time_ms;
 
     /// Request pool to which the request was submitted for admission, or an empty string
     /// if this request doesn't have a pool.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index 6fa6d4a..4a9d8f6 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -61,11 +61,11 @@ static const string PER_HOST_VCORES_KEY = "Estimated Per-Host VCores";
 static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats";
 static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats";
 
-ImpalaServer::QueryExecState::QueryExecState(
-    const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend,
-    ImpalaServer* server, shared_ptr<SessionState> session)
+ImpalaServer::QueryExecState::QueryExecState(const TQueryCtx& query_ctx,
+    ExecEnv* exec_env, Frontend* frontend, ImpalaServer* server,
+    shared_ptr<SessionState> session)
   : query_ctx_(query_ctx),
-    last_active_time_(numeric_limits<int64_t>::max()),
+    last_active_time_ms_(numeric_limits<int64_t>::max()),
     ref_count_(0L),
     child_query_executor_(new ChildQueryExecutor),
     exec_env_(exec_env),
@@ -74,7 +74,7 @@ ImpalaServer::QueryExecState::QueryExecState(
     schedule_(NULL),
     coord_(NULL),
     result_cache_max_size_(-1),
-    profile_(&profile_pool_, "Query"),  // assign name w/ id after planning
+    profile_(&profile_pool_, "Query"), // assign name w/ id after planning
     server_profile_(&profile_pool_, "ImpalaServer"),
     summary_profile_(&profile_pool_, "Summary"),
     is_cancelled_(false),
@@ -994,7 +994,7 @@ void ImpalaServer::QueryExecState::SetCreateTableAsSelectResultSet() {
 void ImpalaServer::QueryExecState::MarkInactive() {
   client_wait_sw_.Start();
   lock_guard<mutex> l(expiration_data_lock_);
-  last_active_time_ = UnixMillis();
+  last_active_time_ms_ = UnixMillis();
   DCHECK(ref_count_ > 0) << "Invalid MarkInactive()";
   --ref_count_;
 }
@@ -1004,7 +1004,7 @@ void ImpalaServer::QueryExecState::MarkActive() {
   int64_t elapsed_time = client_wait_sw_.ElapsedTime();
   client_wait_timer_->Set(elapsed_time);
   lock_guard<mutex> l(expiration_data_lock_);
-  last_active_time_ = UnixMillis();
+  last_active_time_ms_ = UnixMillis();
   ++ref_count_;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/be/src/service/query-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.h b/be/src/service/query-exec-state.h
index 20cd7bf..8ac066e 100644
--- a/be/src/service/query-exec-state.h
+++ b/be/src/service/query-exec-state.h
@@ -195,9 +195,9 @@ class ImpalaServer::QueryExecState {
     return exec_request_.analysis_warnings;
   }
 
-  inline int64_t last_active() const {
+  inline int64_t last_active_ms() const {
     boost::lock_guard<boost::mutex> l(expiration_data_lock_);
-    return last_active_time_;
+    return last_active_time_ms_;
   }
 
   /// Returns true if Impala is actively processing this query.
@@ -219,10 +219,12 @@ class ImpalaServer::QueryExecState {
   /// See "Locking" in the class comment for lock acquisition order.
   boost::mutex fetch_rows_lock_;
 
-  /// Protects last_active_time_ and ref_count_. Only held during short function calls -
-  /// no other locks should be acquired while holding this lock.
+  /// Protects last_active_time_ms_ and ref_count_. Only held during short function calls
+  /// - no other locks should be acquired while holding this lock.
   mutable boost::mutex expiration_data_lock_;
-  int64_t last_active_time_;
+
+  /// Stores the last time that the query was actively doing work, in Unix milliseconds.
+  int64_t last_active_time_ms_;
 
   /// ref_count_ > 0 if Impala is currently performing work on this query's behalf. Every
   /// time a client instructs Impala to do work on behalf of this query, the ref count is
@@ -335,11 +337,11 @@ class ImpalaServer::QueryExecState {
   /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements.
   Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op);
 
-  /// Updates last_active_time_ and ref_count_ to reflect that query is currently not doing
-  /// any work. Takes expiration_data_lock_.
+  /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently not
+  /// doing any work. Takes expiration_data_lock_.
   void MarkInactive();
 
-  /// Updates last_active_time_ and ref_count_ to reflect that query is currently being
+  /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently being
   /// actively processed. Takes expiration_data_lock_.
   void MarkActive();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/tests/common/impala_cluster.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index a5e13b3..d27b89a 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -119,6 +119,10 @@ class ImpalaCluster(object):
         # A process from get_pid_list() no longer exists, continue.
         LOG.info(e)
         continue
+    # If the operating system PIDs wrap around during startup of the local minicluster,
+    # the order of the impalads is incorrect. We order them by their backend port, so that
+    # get_first_impalad() always returns the first one.
+    impalads.sort(key = lambda i: i.service.be_port)
     return impalads, statestored, catalogd
 
 # Represents a process running on a machine and common actions that can be performed
@@ -130,7 +134,7 @@ class Process(object):
         'Process object must be created with valid command line argument list'
 
   def get_pid(self):
-    """Gets the pid of the process. Returns None if the PID cannot be determined"""
+    """Gets the PID of the process. Returns None if the PID cannot be determined"""
     LOG.info("Attempting to find PID for %s" % ' '.join(self.cmd))
     for pid in psutil.get_pid_list():
       try:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/25ebf586/tests/custom_cluster/test_query_expiration.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_query_expiration.py b/tests/custom_cluster/test_query_expiration.py
index 8757116..cbe29ff 100644
--- a/tests/custom_cluster/test_query_expiration.py
+++ b/tests/custom_cluster/test_query_expiration.py
@@ -37,10 +37,10 @@ class TestQueryExpiration(CustomClusterTestSuite):
     assert actual == expected
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--idle_query_timeout=6")
-  def test_query_expiration(self, vector):
+  @CustomClusterTestSuite.with_args("--idle_query_timeout=6 --logbuflevel=-1")
+  def test_query_expiration_test(self, vector):
     """Confirm that single queries expire if not fetched"""
-    impalad = self.cluster.get_any_impalad()
+    impalad = self.cluster.get_first_impalad()
     client = impalad.service.create_beeswax_client()
     num_expired = impalad.service.get_metric_value('impala-server.num-queries-expired')
     handle = client.execute_async("SELECT SLEEP(1000000)")
@@ -59,6 +59,8 @@ class TestQueryExpiration(CustomClusterTestSuite):
     assert num_expired + 1 == impalad.service.get_metric_value(
       'impala-server.num-queries-expired')
     self._check_num_executing(impalad, 2)
+    self.assert_impalad_log_contains('INFO', "Expiring query due to client inactivity: "
+        "[0-9a-f]+:[0-9a-f]+, last activity was at: \d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d")
     impalad.service.wait_for_metric_value('impala-server.num-queries-expired',
                                           num_expired + 3)
 


[2/5] incubator-impala git commit: IMPALA-4653: fix sticky config variable problem

Posted by ta...@apache.org.
IMPALA-4653: fix sticky config variable problem

Previously we could get a developer's shell into a bad state where a
value of a config variable from a previous impala-config.sh version
would override the value from the new impala-config.sh version.

This change adds a new mechanism to override settings locally by adding
settings to impala-config-local.sh. This alternative approach is more
robust, because the config variables will be reset to the intended
values when impala-config.sh is re-sourced.

impala-config-branch.sh can also be used to override settings in a
version-controlled way, e.g. to support having different settings for
different branches.

I did not convert all variables to use this approach, since many people
and Jenkins jobs depend on setting these variables from the environment.
The remaining "sticky" variables are ones where default values should
not change frequently, e.g. source directory locations and build
settings.

Change-Id: I930e2ca825142428d17a6981c77534ab0c8e3489
Reviewed-on: http://gerrit.cloudera.org:8080/5545
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6b90aa3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6b90aa3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6b90aa3a

Branch: refs/heads/master
Commit: 6b90aa3a115f8274a5380945bc4e2b940ef4f711
Parents: 25ebf58
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Dec 16 11:10:09 2016 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jan 5 01:43:36 2017 +0000

----------------------------------------------------------------------
 .gitignore                  |   1 +
 bin/impala-config-branch.sh |  24 +++++
 bin/impala-config.sh        | 210 +++++++++++++++++++++------------------
 3 files changed, 138 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6b90aa3a/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 9d5d981..ebd6bbe 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,6 +12,7 @@ org.eclipse.jdt.ui.prefs
 *benchmark_results.csv*
 load-*-generated.sql
 bin/version.info
+bin/impala-config-local.sh
 
 # distcc options
 .impala_compiler_opts

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6b90aa3a/bin/impala-config-branch.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config-branch.sh b/bin/impala-config-branch.sh
new file mode 100644
index 0000000..adad8cf
--- /dev/null
+++ b/bin/impala-config-branch.sh
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Variables in the file override the default values from impala-config.sh.
+# Config changes for release or features branches should go here so they
+# can be version controlled but not conflict with changes on the master
+# branch.
+#
+# E.g. to override IMPALA_HADOOP_VERSION, you could uncomment this line:
+# IMPALA_HADOOP_VERSION=3.0.0

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6b90aa3a/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index c9b822d..f185861 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -18,11 +18,22 @@
 # Source this file from the $IMPALA_HOME directory to
 # setup your environment. If $IMPALA_HOME is undefined
 # this script will set it to the current working directory.
-
+#
+# Some config variables can be overridden. All overridable variables can be overridden
+# by impala-config-branch.sh, which in turn can be by impala-config-local.sh. Some config
+# variables in the second part of this file (e.g. locations of dependencies, secret keys)
+# can be also overridden by setting environment variables before sourcing this file. We
+# don't support this for variables that change between branches and versions, e.g.
+# version numbers because it creates a "sticky config variable" problem where an old
+# value stays in effect when switching between branches or rebasing until the developer
+# opens a new shell. We also do not support overriding of some variables that are
+# computed based on the values of other variables.
+#
 # This file must be kept compatible with bash options "set -euo pipefail". Those options
 # will be set by other scripts before sourcing this file. Those options are not set in
 # this script because scripts outside this repository may need to be updated and that
 # is not practical at this time.
+
 export JAVA_HOME="${JAVA_HOME:-/usr/java/default}"
 if [ ! -d "$JAVA_HOME" ]; then
   echo "JAVA_HOME must be set to the location of your JDK!"
@@ -42,38 +53,117 @@ if [ -z "$IMPALA_HOME" ]; then
   fi
 fi
 
-: ${IMPALA_TOOLCHAIN="$IMPALA_HOME/toolchain"}
+export IMPALA_TOOLCHAIN=${IMPALA_TOOLCHAIN-"$IMPALA_HOME/toolchain"}
 if [ -z "$IMPALA_TOOLCHAIN" ]; then
   echo "IMPALA_TOOLCHAIN must be specified. Please set it to a valid directory or"\
        "leave it unset."
   return 1
 fi
 
-# If true, will not call $IMPALA_HOME/bin/bootstrap_toolchain.py.
-: ${SKIP_TOOLCHAIN_BOOTSTRAP=false}
+#######################################################################################
+# Variables that can be overridden by impala-config-*.sh but not by environment vars. #
+# All component versions and other variables that get updated periodically or between #
+# branches go here to avoid the "sticky variable" problem (IMPALA-4653) where the     #
+# variable from a previously-sourced impala-config.sh overrides the new value.        #
+#######################################################################################
 
 # The unique build id of the toolchain to use if bootstrapping. This is generated by the
 # native-toolchain build when publishing its build artifacts. This should be changed when
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-: ${IMPALA_TOOLCHAIN_BUILD_ID=308-96a4cc516e}
+export IMPALA_TOOLCHAIN_BUILD_ID=308-96a4cc516e
+
+# Versions of toolchain dependencies.
+# -----------------------------------
+export IMPALA_AVRO_VERSION=1.7.4-p4
+export IMPALA_BINUTILS_VERSION=2.26-p1
+export IMPALA_BOOST_VERSION=1.57.0
+export IMPALA_BREAKPAD_VERSION=20150612-p1
+export IMPALA_BZIP2_VERSION=1.0.6-p2
+export IMPALA_CMAKE_VERSION=3.2.3-p1
+export IMPALA_CYRUS_SASL_VERSION=2.1.23
+export IMPALA_GCC_VERSION=4.9.2
+export IMPALA_GFLAGS_VERSION=2.0
+export IMPALA_GLOG_VERSION=0.3.2-p2
+export IMPALA_GPERFTOOLS_VERSION=2.5
+export IMPALA_GTEST_VERSION=1.6.0
+export IMPALA_LLVM_VERSION=3.8.0-p1
+export IMPALA_LLVM_ASAN_VERSION=3.8.0-p1
+# Debug builds should use the release+asserts build to get additional coverage.
+# Don't use the LLVM debug build because the binaries are too large to distribute.
+export IMPALA_LLVM_DEBUG_VERSION=3.8.0-asserts-p1
+export IMPALA_LZ4_VERSION=svn
+export IMPALA_OPENLDAP_VERSION=2.4.25
+export IMPALA_OPENSSL_VERSION=0.9.8zf
+export IMPALA_POSTGRES_JDBC_DRIVER_VERSION=9.0-801
+export IMPALA_RAPIDJSON_VERSION=0.11
+export IMPALA_RE2_VERSION=20130115-p1
+export IMPALA_SNAPPY_VERSION=1.1.3
+export IMPALA_SQUEASEL_VERSION=3.3
+# TPC utilities used for test/benchmark data generation.
+export IMPALA_TPC_DS_VERSION=2.1.0
+export IMPALA_TPC_H_VERSION=2.17.0
+export IMPALA_THRIFT_VERSION=0.9.0-p8
+export IMPALA_THRIFT_JAVA_VERSION=0.9.0
+export IMPALA_ZLIB_VERSION=1.2.8
+
+if [[ $OSTYPE == "darwin"* ]]; then
+  IMPALA_CYRUS_SASL_VERSION=2.1.26
+  IMPALA_GPERFTOOLS_VERSION=2.3
+  IMPALA_OPENSSL_VERSION=1.0.1p
+  IMPALA_THRIFT_VERSION=0.9.2
+  IMPALA_THRIFT_JAVA_VERSION=0.9.2
+fi
+
+# Kudu version in the toolchain; provides libkudu_client.so and minicluster binaries.
+export IMPALA_KUDU_VERSION=e018a83
+
+# Kudu version used to identify Java client jar from maven
+export KUDU_JAVA_VERSION=1.2.0-SNAPSHOT
+
+# Versions of Hadoop ecosystem dependencies.
+# ------------------------------------------
+export CDH_MAJOR_VERSION=5
+export IMPALA_HADOOP_VERSION=2.6.0-cdh5.11.0-SNAPSHOT
+export IMPALA_HBASE_VERSION=1.2.0-cdh5.11.0-SNAPSHOT
+export IMPALA_HIVE_VERSION=1.1.0-cdh5.11.0-SNAPSHOT
+export IMPALA_SENTRY_VERSION=1.5.1-cdh5.11.0-SNAPSHOT
+export IMPALA_PARQUET_VERSION=1.5.0-cdh5.11.0-SNAPSHOT
+export IMPALA_LLAMA_MINIKDC_VERSION=1.0.0
+
+# Source the branch and local config override files here to override any
+# variables above or any variables below that allow overriding via environment
+# variable.
+. "$IMPALA_HOME/bin/impala-config-branch.sh"
+if [ -f "$IMPALA_HOME/bin/impala-config-local.sh" ]; then
+  . "$IMPALA_HOME/bin/impala-config-local.sh"
+fi
+
+#########################################################################################
+# Below here are variables that can be overridden by impala-config-*.sh and environment #
+# vars, variables computed based on other variables, and variables that cannot be       #
+# overridden.                                                                           #
+#########################################################################################
+
+# If true, will not call $IMPALA_HOME/bin/bootstrap_toolchain.py.
+export SKIP_TOOLCHAIN_BOOTSTRAP=${SKIP_TOOLCHAIN_BOOTSTRAP-false}
 
 # This flag is used in $IMPALA_HOME/cmake_modules/toolchain.cmake.
 # If it's 0, Impala will be built with the compiler in the toolchain directory.
-: ${USE_SYSTEM_GCC=0}
+export USE_SYSTEM_GCC=${USE_SYSTEM_GCC-0}
 
 # Use ld.gold instead of ld by default to speed up builds.
-: ${USE_GOLD_LINKER=true}
+export USE_GOLD_LINKER=${USE_GOLD_LINKER-true}
 
 # Override the default compiler by setting a path to the new compiler. The default
 # compiler depends on USE_SYSTEM_GCC and IMPALA_GCC_VERSION. The intended use case
 # is to set the compiler to distcc, in that case the user would also set
 # IMPALA_BUILD_THREADS to increase parallelism.
-: ${IMPALA_CXX_COMPILER=default}
+export IMPALA_CXX_COMPILER=${IMPALA_CXX_COMPILER-default}
 
 # If enabled, debug symbols are added to cross-compiled IR.
-: ${ENABLE_IMPALA_IR_DEBUG_INFO=false}
+export ENABLE_IMPALA_IR_DEBUG_INFO=${ENABLE_IMPALA_IR_DEBUG_INFO-false}
 
 if [ -d "$IMPALA_HOME/thirdparty" ]; then
   NO_THIRDPARTY=false
@@ -82,16 +172,8 @@ else
 fi
 # If true, download and use the CDH components from S3 instead of the ones
 # in $IMPALA_HOME/thirdparty.
-: ${DOWNLOAD_CDH_COMPONENTS="$NO_THIRDPARTY"}
-
-export IMPALA_TOOLCHAIN
-export SKIP_TOOLCHAIN_BOOTSTRAP
-export IMPALA_TOOLCHAIN_BUILD_ID
-export USE_SYSTEM_GCC
-export USE_GOLD_LINKER
-export IMPALA_CXX_COMPILER
-export ENABLE_IMPALA_IR_DEBUG_INFO
-export DOWNLOAD_CDH_COMPONENTS
+export DOWNLOAD_CDH_COMPONENTS=${DOWNLOAD_CDH_COMPONENTS-"$NO_THIRDPARTY"}
+
 export IS_OSX="$(if [[ "$OSTYPE" == "darwin"* ]]; then echo true; else echo false; fi)"
 
 # To use a local build of Kudu, set KUDU_BUILD_DIR to the path Kudu was built in and
@@ -104,10 +186,8 @@ export IS_OSX="$(if [[ "$OSTYPE" == "darwin"* ]]; then echo true; else echo fals
 #   cmake <path to Kudu source dir>
 #   make
 #   DESTDIR=$KUDU_CLIENT_DIR make install
-: ${KUDU_BUILD_DIR=}
-: ${KUDU_CLIENT_DIR=}
-export KUDU_BUILD_DIR
-export KUDU_CLIENT_DIR
+export KUDU_BUILD_DIR=${KUDU_BUILD_DIR-}
+export KUDU_CLIENT_DIR=${KUDU_CLIENT_DIR-}
 if [[ -n "$KUDU_BUILD_DIR" && -z "$KUDU_CLIENT_DIR" ]]; then
   echo When KUDU_BUILD_DIR is set KUDU_CLIENT_DIR must also be set. 1>&2
   return 1
@@ -117,8 +197,8 @@ if [[ -z "$KUDU_BUILD_DIR" && -n "$KUDU_CLIENT_DIR" ]]; then
   return 1
 fi
 
-: ${USE_KUDU_DEBUG_BUILD=false}   # Only applies when using Kudu from the toolchain
-export USE_KUDU_DEBUG_BUILD
+# Only applies when using Kudu from the toolchain
+export USE_KUDU_DEBUG_BUILD=${USE_KUDU_DEBUG_BUILD-false}
 
 # Kudu doesn't compile on some old Linux distros. KUDU_IS_SUPPORTED enables building Kudu
 # into the backend. The frontend build is OS independent since it is Java.
@@ -149,7 +229,6 @@ if [[ -z "${KUDU_IS_SUPPORTED-}" ]]; then
 fi
 export KUDU_IS_SUPPORTED
 
-export CDH_MAJOR_VERSION=5
 export HADOOP_LZO="${HADOOP_LZO-$IMPALA_HOME/../hadoop-lzo}"
 export IMPALA_LZO="${IMPALA_LZO-$IMPALA_HOME/../Impala-lzo}"
 export IMPALA_AUX_TEST_HOME="${IMPALA_AUX_TEST_HOME-$IMPALA_HOME/../Impala-auxiliary-tests}"
@@ -244,63 +323,9 @@ else
 fi
 export NUM_CONCURRENT_TESTS="${NUM_CONCURRENT_TESTS-${CORES}}"
 
-# Versions of toolchain dependencies.
-export IMPALA_AVRO_VERSION=1.7.4-p4
-export IMPALA_BINUTILS_VERSION=2.26-p1
-export IMPALA_BOOST_VERSION=1.57.0
-export IMPALA_BREAKPAD_VERSION=20150612-p1
-export IMPALA_BZIP2_VERSION=1.0.6-p2
-export IMPALA_CMAKE_VERSION=3.2.3-p1
-export IMPALA_CYRUS_SASL_VERSION=2.1.23
-export IMPALA_GCC_VERSION=4.9.2
-export IMPALA_GFLAGS_VERSION=2.0
-export IMPALA_GLOG_VERSION=0.3.2-p2
-export IMPALA_GPERFTOOLS_VERSION=2.5
-export IMPALA_GTEST_VERSION=1.6.0
-export IMPALA_LLVM_VERSION=3.8.0-p1
-export IMPALA_LLVM_ASAN_VERSION=3.8.0-p1
-# Debug builds should use the release+asserts build to get additional coverage.
-# Don't use the LLVM debug build because the binaries are too large to distribute.
-export IMPALA_LLVM_DEBUG_VERSION=3.8.0-asserts-p1
-export IMPALA_LZ4_VERSION=svn
-export IMPALA_OPENLDAP_VERSION=2.4.25
-export IMPALA_OPENSSL_VERSION=0.9.8zf
-export IMPALA_POSTGRES_JDBC_DRIVER_VERSION=9.0-801
-export IMPALA_RAPIDJSON_VERSION=0.11
-export IMPALA_RE2_VERSION=20130115-p1
-export IMPALA_SNAPPY_VERSION=1.1.3
-export IMPALA_SQUEASEL_VERSION=3.3
-# TPC utilities used for test/benchmark data generation.
-export IMPALA_TPC_DS_VERSION=2.1.0
-export IMPALA_TPC_H_VERSION=2.17.0
-export IMPALA_THRIFT_VERSION=0.9.0-p8
-export IMPALA_THRIFT_JAVA_VERSION=0.9.0
-export IMPALA_ZLIB_VERSION=1.2.8
-
-# Kudu version in the toolchain; provides libkudu_client.so and minicluster binaries.
-export IMPALA_KUDU_VERSION=e018a83
-
-# Kudu version used to identify Java client jar from maven
-export KUDU_JAVA_VERSION=1.2.0-SNAPSHOT
-
 export KUDU_MASTER="${KUDU_MASTER:-127.0.0.1}"
 export KUDU_MASTER_PORT="${KUDU_MASTER_PORT:-7051}"
 
-if [[ $OSTYPE == "darwin"* ]]; then
-  IMPALA_CYRUS_SASL_VERSION=2.1.26
-  IMPALA_GPERFTOOLS_VERSION=2.3
-  IMPALA_OPENSSL_VERSION=1.0.1p
-  IMPALA_THRIFT_VERSION=0.9.2
-  IMPALA_THRIFT_JAVA_VERSION=0.9.2
-fi
-
-export IMPALA_HADOOP_VERSION=${IMPALA_HADOOP_VERSION:-2.6.0-cdh5.11.0-SNAPSHOT}
-export IMPALA_HBASE_VERSION=${IMPALA_HBASE_VERSION:-1.2.0-cdh5.11.0-SNAPSHOT}
-export IMPALA_HIVE_VERSION=${IMPALA_HIVE_VERSION:-1.1.0-cdh5.11.0-SNAPSHOT}
-export IMPALA_SENTRY_VERSION=${IMPALA_SENTRY_VERSION:-1.5.1-cdh5.11.0-SNAPSHOT}
-export IMPALA_PARQUET_VERSION=${IMPALA_PARQUET_VERSION:-1.5.0-cdh5.11.0-SNAPSHOT}
-export IMPALA_LLAMA_MINIKDC_VERSION=${IMPALA_LLAMA_MINIKDC_VERSION:-1.0.0}
-
 export IMPALA_FE_DIR="$IMPALA_HOME/fe"
 export IMPALA_BE_DIR="$IMPALA_HOME/be"
 export IMPALA_WORKLOAD_DIR="$IMPALA_HOME/testdata/workloads"
@@ -326,10 +351,9 @@ export HADOOP_CONF_DIR="$IMPALA_FE_DIR/src/test/resources"
 export HADOOP_INCLUDE_DIR=${HADOOP_INCLUDE_DIR:-"${HADOOP_HOME}/include"}
 export HADOOP_LIB_DIR=${HADOOP_LIB_DIR:-"${HADOOP_HOME}/lib"}
 
-: ${HADOOP_CLASSPATH=}
 # Please note that the * is inside quotes, thus it won't get expanded by bash but
 # by java, see "Understanding class path wildcards" at http://goo.gl/f0cfft
-export HADOOP_CLASSPATH="$HADOOP_CLASSPATH:${HADOOP_HOME}/share/hadoop/tools/lib/*"
+export HADOOP_CLASSPATH="${HADOOP_CLASSPATH-}:${HADOOP_HOME}/share/hadoop/tools/lib/*"
 # YARN is configured to use LZO so the LZO jar needs to be in the hadoop classpath.
 export LZO_JAR_PATH="$HADOOP_LZO/build/hadoop-lzo-0.4.15.jar"
 HADOOP_CLASSPATH+=":$LZO_JAR_PATH"
@@ -380,12 +404,10 @@ export ASAN_SYMBOLIZER_PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_ASAN_VERSION
 export CLUSTER_DIR="${IMPALA_HOME}/testdata/cluster"
 
 # The number of parallel build processes we should run at a time.
-: ${IMPALA_BUILD_THREADS:="$(nproc)"}
-export IMPALA_BUILD_THREADS
+export IMPALA_BUILD_THREADS=${IMPALA_BUILD_THREADS-"$(nproc)"}
 
 # Additional flags to pass to make or ninja.
-: ${IMPALA_MAKE_FLAGS:=""}
-export IMPALA_MAKE_FLAGS
+export IMPALA_MAKE_FLAGS=${IMPALA_MAKE_FLAGS-}
 
 # Some environments (like the packaging build) might not have $USER set.  Fix that here.
 export USER="${USER-`id -un`}"
@@ -401,12 +423,11 @@ export USER="${USER-`id -un`}"
 # TODO: figure out how to turn this off only the stuff that can't run with it.
 #LIBHDFS_OPTS="-Xcheck:jni -Xcheck:nabounds"
 # - Points to the location of libbackend.so.
-LIBHDFS_OPTS="${LIBHDFS_OPTS:-}"
-LIBHDFS_OPTS="${LIBHDFS_OPTS} -Djava.library.path=${HADOOP_LIB_DIR}/native/"
+export LIBHDFS_OPTS="${LIBHDFS_OPTS:-} -Djava.library.path=${HADOOP_LIB_DIR}/native/"
 # READER BEWARE: This always points to the debug build.
 # TODO: Consider having cmake scripts change this value depending on
 # the build type.
-export LIBHDFS_OPTS="${LIBHDFS_OPTS}:${IMPALA_HOME}/be/build/debug/service"
+LIBHDFS_OPTS="${LIBHDFS_OPTS}:${IMPALA_HOME}/be/build/debug/service"
 
 export ARTISTIC_STYLE_OPTIONS="$IMPALA_BE_DIR/.astylerc"
 
@@ -419,8 +440,7 @@ export JAVA_LIBRARY_PATH="${IMPALA_SNAPPY_PATH}"
 LIB_JAVA=`find "${JAVA_HOME}/"   -name libjava.so | head -1`
 LIB_JSIG=`find "${JAVA_HOME}/"   -name libjsig.so | head -1`
 LIB_JVM=` find "${JAVA_HOME}/"   -name libjvm.so  | head -1`
-LD_LIBRARY_PATH="${LD_LIBRARY_PATH-}"
-LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:`dirname ${LIB_JAVA}`:`dirname ${LIB_JSIG}`"
+export LD_LIBRARY_PATH="${LD_LIBRARY_PATH:-}:`dirname ${LIB_JAVA}`:`dirname ${LIB_JSIG}`"
 LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:`dirname ${LIB_JVM}`"
 LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:${HADOOP_LIB_DIR}/native"
 LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:${IMPALA_HOME}/be/build/debug/service"
@@ -432,16 +452,12 @@ if [ $USE_SYSTEM_GCC -eq 0 ]; then
   LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:${IMPALA_TOOLCHAIN_GCC_LIB}"
 fi
 
-export LD_LIBRARY_PATH
-LD_PRELOAD="${LD_PRELOAD-}"
-export LD_PRELOAD="${LD_PRELOAD}:${LIB_JSIG}"
+export LD_PRELOAD="${LD_PRELOAD-}:${LIB_JSIG}"
 
-CLASSPATH="${CLASSPATH-}"
-CLASSPATH="$IMPALA_FE_DIR/target/dependency:$CLASSPATH"
+export CLASSPATH="$IMPALA_FE_DIR/target/dependency:${CLASSPATH-}"
 CLASSPATH="$IMPALA_FE_DIR/target/classes:$CLASSPATH"
 CLASSPATH="$IMPALA_FE_DIR/src/test/resources:$CLASSPATH"
 CLASSPATH="$LZO_JAR_PATH:$CLASSPATH"
-export CLASSPATH
 
 # Setup aliases
 # Helper alias to script that verifies and merges Gerrit changes


[5/5] incubator-impala git commit: IMPALA-3641: Fix catalogd RPC responses to DROP IF EXISTS.

Posted by ta...@apache.org.
IMPALA-3641: Fix catalogd RPC responses to DROP IF EXISTS.

The main problem was that the catalogd's response to
a DROP IF EXISTS operations included a removed object
that was applied to the requesting impalad's catalog cache.
In particular, a DROP DATABASE IF EXISTS that did not actually
drop anything in the catalogd still returned the object name in
the RPC response as a removed object with the *current* catalog
version (i.e., without incrementing the catalog version).

The above behavior lead to a situation where a drop of
a non-existent object overwrote a legitimate entry in
an impalad's CatalogDeltaLog. Recall that the version of the
dropped object was based on the current catalog version
at some point in time, e.g., the same version of a
legitimate entry in the CatalogDeltaLog.

As a reminder, the CatalogDeltaLog protects deletions from
being undone via updates from the statestore. So overwriting
an object in the CatalogDeltaLog can lead to a dropped object
appearing again with certain timing of a statestore update.

Please see the JIRA for an analysis of logging output that
shows the bug and its effect.

The fix is simple: The RPC response of a DROP IF EXISTS should
only contain a removed object if an object was actually
removed from the catalogd.

This fix, however, introduces a new consistency issue (IMPALA-4727).
The new behavior is not ideal, but better than the old behavior,
explained as follows:
The behavior before this patch is problematic because the drop of a
completely unrelated object can affect the consistency of a drop+add
on another object.
The behavior after this patch is that a drop+add may fail in the add
if there is an ill-timed concurrent drop of the same object.

Testing:
- Unfortunately, I have not been able to reproduce the issue
  locally despite vigorous attempts and despite knowing what
  the problem is. Our existing tests seem to reproduce the
  issue pretty reliably, so it's not clear whether a targeted
  test is feasible or needed.
- An exhaustive test run passed.

Change-Id: Icb1f31eb2ecf05b9b51ef4e12e6bb78f44d0cf84
Reviewed-on: http://gerrit.cloudera.org:8080/5556
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6d15f037
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6d15f037
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6d15f037

Branch: refs/heads/master
Commit: 6d15f03777afeb1375927ad9520a5db5bc9d42a1
Parents: 95ed443
Author: Alex Behm <al...@cloudera.com>
Authored: Tue Dec 20 00:12:51 2016 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jan 5 04:15:30 2017 +0000

----------------------------------------------------------------------
 .../impala/catalog/CatalogServiceCatalog.java   |  21 +--
 .../impala/service/CatalogOpExecutor.java       | 128 +++++++++----------
 2 files changed, 77 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6d15f037/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 85d92cb..b51c9aa 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -26,12 +26,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.UUID;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.Path;
@@ -45,10 +45,6 @@ import org.apache.hadoop.hive.metastore.api.ResourceType;
 import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
-import org.apache.log4j.Logger;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.TException;
-
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.FileSystemUtil;
@@ -69,6 +65,10 @@ import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.SentryProxy;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -866,14 +866,19 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Renames a table. Equivalent to an atomic drop + add of the table. Returns
-   * the new Table object with an incremented catalog version or null if operation
-   * was not successful.
+   * the new Table object with an incremented catalog version or null if the
+   * drop or add were unsuccessful. If null is returned, then the catalog cache
+   * is in one of the following two states:
+   * 1. Old table was not removed, and new table was not added
+   * 2. Old table was removed, but new table was not added
    */
   public Table renameTable(TTableName oldTableName, TTableName newTableName)
       throws CatalogException {
     // Remove the old table name from the cache and add the new table.
     Db db = getDb(oldTableName.getDb_name());
-    if (db != null) db.removeTable(oldTableName.getTable_name());
+    if (db == null) return null;
+    Table oldTable = db.removeTable(oldTableName.getTable_name());
+    if (oldTable == null) return null;
     return addTable(newTableName.getDb_name(), newTableName.getTable_name());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6d15f037/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index f878b12..d9448b3 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
-import org.apache.impala.common.Reference;
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.authorization.User;
@@ -85,10 +84,11 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.Reference;
 import org.apache.impala.thrift.ImpalaInternalServiceConstants;
 import org.apache.impala.thrift.JniCatalogConstants;
-import org.apache.impala.thrift.TAlterTableAddPartitionParams;
 import org.apache.impala.thrift.TAlterTableAddDropRangePartitionParams;
+import org.apache.impala.thrift.TAlterTableAddPartitionParams;
 import org.apache.impala.thrift.TAlterTableAddReplaceColsParams;
 import org.apache.impala.thrift.TAlterTableChangeColParams;
 import org.apache.impala.thrift.TAlterTableDropColParams;
@@ -585,8 +585,9 @@ public class CatalogOpExecutor {
    * version of the serialized table as the version of the catalog update result.
    */
   private static void addTableToCatalogUpdate(Table tbl, TCatalogUpdateResult result) {
-    TCatalogObject updatedCatalogObject = TableToTCatalogObject(tbl);
-    result.setUpdated_catalog_object_DEPRECATED(TableToTCatalogObject(tbl));
+    Preconditions.checkNotNull(tbl);
+    TCatalogObject updatedCatalogObject = tbl.toTCatalogObject();
+    result.setUpdated_catalog_object_DEPRECATED(updatedCatalogObject);
     result.setVersion(updatedCatalogObject.getCatalog_version());
   }
 
@@ -1056,18 +1057,16 @@ public class CatalogOpExecutor {
   private void dropDataSource(TDropDataSourceParams params, TDdlExecResponse resp)
       throws ImpalaException {
     if (LOG.isTraceEnabled()) LOG.trace("Drop DATA SOURCE: " + params.toString());
-    DataSource dataSource = catalog_.getDataSource(params.getData_source());
+    DataSource dataSource = catalog_.removeDataSource(params.getData_source());
     if (dataSource == null) {
       if (!params.if_exists) {
         throw new ImpalaRuntimeException("Data source " + params.getData_source() +
             " does not exists.");
       }
-      // The user specified IF EXISTS and the data source didn't exist, just
-      // return the current catalog version.
+      // No data source was removed.
       resp.result.setVersion(catalog_.getCatalogVersion());
       return;
     }
-    catalog_.removeDataSource(params.getData_source());
     TCatalogObject removedObject = new TCatalogObject();
     removedObject.setType(TCatalogObjectType.DATA_SOURCE);
     removedObject.setData_source(dataSource.toThrift());
@@ -1228,13 +1227,12 @@ public class CatalogOpExecutor {
             String.format(HMS_RPC_ERROR_FORMAT_STR, "dropDatabase"), e);
       }
       Db removedDb = catalog_.removeDb(params.getDb());
-      // If no db was removed as part of this operation just return the current catalog
-      // version.
       if (removedDb == null) {
-        removedObject.setCatalog_version(catalog_.getCatalogVersion());
-      } else {
-        removedObject.setCatalog_version(removedDb.getCatalogVersion());
+        // Nothing was removed from the catalogd's cache.
+        resp.result.setVersion(catalog_.getCatalogVersion());
+        return;
       }
+      removedObject.setCatalog_version(removedDb.getCatalogVersion());
     }
     removedObject.setType(TCatalogObjectType.DATABASE);
     removedObject.setDb(new TDatabase());
@@ -1349,32 +1347,33 @@ public class CatalogOpExecutor {
 
       Table table = catalog_.removeTable(params.getTable_name().db_name,
           params.getTable_name().table_name);
-      if (table != null) {
-        resp.result.setVersion(table.getCatalogVersion());
-        if (table instanceof HdfsTable) {
-          HdfsTable hdfsTable = (HdfsTable) table;
-          if (hdfsTable.isMarkedCached()) {
-            try {
-              HdfsCachingUtil.uncacheTbl(table.getMetaStoreTable());
-            } catch (Exception e) {
-              LOG.error("Unable to uncache table: " + table.getFullName(), e);
-            }
+      if (table == null) {
+        // Nothing was removed from the catalogd's cache.
+        resp.result.setVersion(catalog_.getCatalogVersion());
+        return;
+      }
+      resp.result.setVersion(table.getCatalogVersion());
+      if (table instanceof HdfsTable) {
+        HdfsTable hdfsTable = (HdfsTable) table;
+        if (hdfsTable.isMarkedCached()) {
+          try {
+            HdfsCachingUtil.uncacheTbl(table.getMetaStoreTable());
+          } catch (Exception e) {
+            LOG.error("Unable to uncache table: " + table.getFullName(), e);
           }
-          if (table.getNumClusteringCols() > 0) {
-            for (HdfsPartition partition: hdfsTable.getPartitions()) {
-              if (partition.isMarkedCached()) {
-                try {
-                  HdfsCachingUtil.uncachePartition(partition);
-                } catch (Exception e) {
-                  LOG.error("Unable to uncache partition: " +
-                      partition.getPartitionName(), e);
-                }
+        }
+        if (table.getNumClusteringCols() > 0) {
+          for (HdfsPartition partition: hdfsTable.getPartitions()) {
+            if (partition.isMarkedCached()) {
+              try {
+                HdfsCachingUtil.uncachePartition(partition);
+              } catch (Exception e) {
+                LOG.error("Unable to uncache partition: " +
+                    partition.getPartitionName(), e);
               }
             }
           }
         }
-      } else {
-        resp.result.setVersion(catalog_.getCatalogVersion());
       }
     }
     removedObject.setType(TCatalogObjectType.TABLE);
@@ -2076,17 +2075,25 @@ public class CatalogOpExecutor {
     }
     // Rename the table in the Catalog and get the resulting catalog object.
     // ALTER TABLE/VIEW RENAME is implemented as an ADD + DROP.
-    TCatalogObject newTable = TableToTCatalogObject(
-        catalog_.renameTable(tableName.toThrift(), newTableName.toThrift()));
+    Table newTable = catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
+    if (newTable == null) {
+      // The rename succeeded in the HMS but failed in the catalog cache. The cache is in
+      // an inconsistent state, but can likely be fixed by running "invalidate metadata".
+      throw new ImpalaRuntimeException(String.format(
+          "Table/view rename succeeded in the Hive Metastore, but failed in Impala's " +
+          "Catalog Server. Running 'invalidate metadata <tbl>' on the old table name " +
+          "'%s' and the new table name '%s' may fix the problem." , tableName.toString(),
+          newTableName.toString()));
+    }
+
+    TCatalogObject addedObject = newTable.toTCatalogObject();
     TCatalogObject removedObject = new TCatalogObject();
     removedObject.setType(TCatalogObjectType.TABLE);
-    removedObject.setTable(new TTable());
-    removedObject.getTable().setTbl_name(tableName.getTbl());
-    removedObject.getTable().setDb_name(tableName.getDb());
-    removedObject.setCatalog_version(newTable.getCatalog_version());
+    removedObject.setTable(new TTable(tableName.getDb(), tableName.getTbl()));
+    removedObject.setCatalog_version(addedObject.getCatalog_version());
     response.result.setRemoved_catalog_object_DEPRECATED(removedObject);
-    response.result.setUpdated_catalog_object_DEPRECATED(newTable);
-    response.result.setVersion(newTable.getCatalog_version());
+    response.result.setUpdated_catalog_object_DEPRECATED(addedObject);
+    response.result.setVersion(addedObject.getCatalog_version());
   }
 
   /**
@@ -2655,8 +2662,9 @@ public class CatalogOpExecutor {
       role = catalog_.getSentryProxy().dropRole(requestingUser,
           createDropRoleParams.getRole_name());
       if (role == null) {
-        role = new Role(createDropRoleParams.getRole_name(), Sets.<String>newHashSet());
-        role.setCatalogVersion(catalog_.getCatalogVersion());
+        // Nothing was removed from the catalogd's cache.
+        resp.result.setVersion(catalog_.getCatalogVersion());
+        return;
       }
     } else {
       role = catalog_.getSentryProxy().createRole(requestingUser,
@@ -2839,12 +2847,6 @@ public class CatalogOpExecutor {
     return fsList;
   }
 
-  private static TCatalogObject TableToTCatalogObject(Table table) {
-    if (table != null) return table.toTCatalogObject();
-    return new TCatalogObject(TCatalogObjectType.TABLE,
-        Catalog.INITIAL_CATALOG_VERSION);
-  }
-
    /**
    * Sets the table parameter 'transient_lastDdlTime' to System.currentTimeMillis()/1000
    * in the given msTbl. 'transient_lastDdlTime' is guaranteed to be changed.
@@ -2926,20 +2928,19 @@ public class CatalogOpExecutor {
       }
 
       if (modifiedObjects.first == null) {
-        TCatalogObject thriftTable = TableToTCatalogObject(modifiedObjects.second);
-        if (modifiedObjects.second != null) {
-          // Return the TCatalogObject in the result to indicate this request can be
-          // processed as a direct DDL operation.
-          if (wasRemoved) {
-            resp.getResult().setRemoved_catalog_object_DEPRECATED(thriftTable);
-          } else {
-            resp.getResult().setUpdated_catalog_object_DEPRECATED(thriftTable);
-          }
-        } else {
+        if (modifiedObjects.second == null) {
           // Table does not exist in the meta store and Impala catalog, throw error.
           throw new TableNotFoundException("Table not found: " +
-              req.getTable_name().getDb_name() + "."
-              + req.getTable_name().getTable_name());
+              req.getTable_name().getDb_name() + "." +
+              req.getTable_name().getTable_name());
+        }
+        TCatalogObject thriftTable = modifiedObjects.second.toTCatalogObject();
+        // Return the TCatalogObject in the result to indicate this request can be
+        // processed as a direct DDL operation.
+        if (wasRemoved) {
+          resp.getResult().setRemoved_catalog_object_DEPRECATED(thriftTable);
+        } else {
+          resp.getResult().setUpdated_catalog_object_DEPRECATED(thriftTable);
         }
         resp.getResult().setVersion(thriftTable.getCatalog_version());
       } else {
@@ -2965,8 +2966,7 @@ public class CatalogOpExecutor {
       catalog_.reset();
       resp.result.setVersion(catalog_.getCatalogVersion());
     }
-    resp.getResult().setStatus(
-        new TStatus(TErrorCode.OK, new ArrayList<String>()));
+    resp.getResult().setStatus(new TStatus(TErrorCode.OK, new ArrayList<String>()));
     return resp;
   }
 


[3/5] incubator-impala git commit: IMPALA-3202, IMPALA-2079: rework scratch file I/O

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index dc35600..bf2b7ec 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -15,16 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "runtime/tmp-file-mgr.h"
+
 #include <boost/algorithm/string.hpp>
+#include <boost/filesystem.hpp>
 #include <boost/lexical_cast.hpp>
 #include <boost/thread/locks.hpp>
-#include <boost/uuid/uuid_io.hpp>
 #include <boost/uuid/random_generator.hpp>
-#include <boost/filesystem.hpp>
-#include <gutil/strings/substitute.h>
+#include <boost/uuid/uuid_io.hpp>
 #include <gutil/strings/join.h>
+#include <gutil/strings/substitute.h>
 
-#include "runtime/tmp-file-mgr.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tmp-file-mgr-internal.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
 #include "util/filesystem-util.h"
@@ -32,9 +35,13 @@
 
 #include "common/names.h"
 
+DEFINE_bool(disk_spill_encryption, false,
+    "Set this to encrypt and perform an integrity "
+    "check on all data spilled to disk during a query");
 DEFINE_string(scratch_dirs, "/tmp", "Writable scratch directories");
-
-#include "common/names.h"
+DEFINE_bool(allow_multiple_scratch_dirs_per_device, false,
+    "If false and --scratch_dirs contains multiple directories on the same device, "
+    "then only the first writable directory is used");
 
 using boost::algorithm::is_any_of;
 using boost::algorithm::join;
@@ -55,8 +62,10 @@ const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS = "tmp-file-mgr.active-scratch-dir
 const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST =
     "tmp-file-mgr.active-scratch-dirs.list";
 
-TmpFileMgr::TmpFileMgr() : initialized_(false), dir_status_lock_(), tmp_dirs_(),
-  num_active_scratch_dirs_metric_(NULL), active_scratch_dirs_metric_(NULL) {}
+TmpFileMgr::TmpFileMgr()
+  : initialized_(false),
+    num_active_scratch_dirs_metric_(nullptr),
+    active_scratch_dirs_metric_(nullptr) {}
 
 Status TmpFileMgr::Init(MetricGroup* metrics) {
   string tmp_dirs_spec = FLAGS_scratch_dirs;
@@ -65,7 +74,7 @@ Status TmpFileMgr::Init(MetricGroup* metrics) {
   if (!tmp_dirs_spec.empty()) {
     split(all_tmp_dirs, tmp_dirs_spec, is_any_of(","), token_compress_on);
   }
-  return InitCustom(all_tmp_dirs, true, metrics);
+  return InitCustom(all_tmp_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device, metrics);
 }
 
 Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_device,
@@ -108,7 +117,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
         if (disk_id >= 0) is_tmp_dir_on_disk[disk_id] = true;
         LOG(INFO) << "Using scratch directory " << scratch_subdir_path.string() << " on "
                   << "disk " << disk_id;
-        tmp_dirs_.push_back(Dir(scratch_subdir_path.string(), false));
+        tmp_dirs_.push_back(scratch_subdir_path.string());
       } else {
         LOG(WARNING) << "Could not remove and recreate directory "
                      << scratch_subdir_path.string() << ": cannot use it for scratch. "
@@ -117,14 +126,14 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
     }
   }
 
-  DCHECK(metrics != NULL);
+  DCHECK(metrics != nullptr);
   num_active_scratch_dirs_metric_ =
       metrics->AddGauge<int64_t>(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0);
-  active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister(metrics,
-      TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
+  active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister(
+      metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
   num_active_scratch_dirs_metric_->set_value(tmp_dirs_.size());
   for (int i = 0; i < tmp_dirs_.size(); ++i) {
-    active_scratch_dirs_metric_->Add(tmp_dirs_[i].path());
+    active_scratch_dirs_metric_->Add(tmp_dirs_[i]);
   }
 
   initialized_ = true;
@@ -137,24 +146,20 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
   return Status::OK();
 }
 
-Status TmpFileMgr::NewFile(FileGroup* file_group, const DeviceId& device_id,
-    const TUniqueId& query_id, unique_ptr<File>* new_file) {
+Status TmpFileMgr::NewFile(
+    FileGroup* file_group, DeviceId device_id, unique_ptr<File>* new_file) {
   DCHECK(initialized_);
   DCHECK_GE(device_id, 0);
   DCHECK_LT(device_id, tmp_dirs_.size());
-  DCHECK(file_group != NULL);
-  if (IsBlacklisted(device_id)) {
-    return Status(TErrorCode::TMP_DEVICE_BLACKLISTED, tmp_dirs_[device_id].path());
-  }
-
+  DCHECK(file_group != nullptr);
   // Generate the full file path.
   string unique_name = lexical_cast<string>(random_generator()());
   stringstream file_name;
-  file_name << PrintId(query_id) << "_" << unique_name;
-  path new_file_path(tmp_dirs_[device_id].path());
+  file_name << PrintId(file_group->unique_id()) << "_" << unique_name;
+  path new_file_path(tmp_dirs_[device_id]);
   new_file_path /= file_name.str();
 
-  new_file->reset(new File(this, file_group, device_id, new_file_path.string()));
+  new_file->reset(new File(file_group, device_id, new_file_path.string()));
   return Status::OK();
 }
 
@@ -162,162 +167,133 @@ string TmpFileMgr::GetTmpDirPath(DeviceId device_id) const {
   DCHECK(initialized_);
   DCHECK_GE(device_id, 0);
   DCHECK_LT(device_id, tmp_dirs_.size());
-  return tmp_dirs_[device_id].path();
+  return tmp_dirs_[device_id];
 }
 
-void TmpFileMgr::BlacklistDevice(DeviceId device_id) {
+int TmpFileMgr::NumActiveTmpDevices() {
   DCHECK(initialized_);
-  DCHECK(device_id >= 0 && device_id < tmp_dirs_.size());
-  bool added;
-  {
-    lock_guard<SpinLock> l(dir_status_lock_);
-    added = tmp_dirs_[device_id].blacklist();
-  }
-  if (added) {
-    num_active_scratch_dirs_metric_->Increment(-1);
-    active_scratch_dirs_metric_->Remove(tmp_dirs_[device_id].path());
-  }
+  return tmp_dirs_.size();
 }
 
-bool TmpFileMgr::IsBlacklisted(DeviceId device_id) {
-  DCHECK(initialized_);
-  DCHECK(device_id >= 0 && device_id < tmp_dirs_.size());
-  lock_guard<SpinLock> l(dir_status_lock_);
-  return tmp_dirs_[device_id].is_blacklisted();
-}
-
-int TmpFileMgr::num_active_tmp_devices() {
-  DCHECK(initialized_);
-  lock_guard<SpinLock> l(dir_status_lock_);
-  int num_active = 0;
-  for (int device_id = 0; device_id < tmp_dirs_.size(); ++device_id) {
-    if (!tmp_dirs_[device_id].is_blacklisted()) ++num_active;
-  }
-  return num_active;
-}
-
-vector<TmpFileMgr::DeviceId> TmpFileMgr::active_tmp_devices() {
+vector<TmpFileMgr::DeviceId> TmpFileMgr::ActiveTmpDevices() {
   vector<TmpFileMgr::DeviceId> devices;
-  // Allocate vector before we grab lock
-  devices.reserve(tmp_dirs_.size());
-  {
-    lock_guard<SpinLock> l(dir_status_lock_);
-    for (DeviceId device_id = 0; device_id < tmp_dirs_.size(); ++device_id) {
-      if (!tmp_dirs_[device_id].is_blacklisted()) {
-        devices.push_back(device_id);
-      }
-    }
+  for (DeviceId device_id = 0; device_id < tmp_dirs_.size(); ++device_id) {
+    devices.push_back(device_id);
   }
   return devices;
 }
 
-TmpFileMgr::File::File(TmpFileMgr* mgr, FileGroup* file_group, DeviceId device_id,
-    const string& path)
-  : mgr_(mgr),
-    file_group_(file_group),
+TmpFileMgr::File::File(FileGroup* file_group, DeviceId device_id, const string& path)
+  : file_group_(file_group),
     path_(path),
     device_id_(device_id),
-    current_size_(0),
+    disk_id_(DiskInfo::disk_id(path.c_str())),
+    bytes_allocated_(0),
     blacklisted_(false) {
-  DCHECK(file_group != NULL);
+  DCHECK(file_group != nullptr);
 }
 
 Status TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) {
   DCHECK_GT(num_bytes, 0);
-  Status status;
-  if (mgr_->IsBlacklisted(device_id_)) {
-    blacklisted_ = true;
-    return Status(TErrorCode::TMP_FILE_BLACKLISTED, path_);
-  }
-  if (current_size_ == 0) {
-    // First call to AllocateSpace. Create the file.
-    status = FileSystemUtil::CreateFile(path_);
-    if (!status.ok()) {
-      ReportIOError(status.msg());
-      return status;
-    }
-    disk_id_ = DiskInfo::disk_id(path_.c_str());
-  }
-  int64_t new_size = current_size_ + num_bytes;
-  status = FileSystemUtil::ResizeFile(path_, new_size);
-  if (!status.ok()) {
-    ReportIOError(status.msg());
-    return status;
-  }
-  *offset = current_size_;
-  current_size_ = new_size;
+  *offset = bytes_allocated_;
+  bytes_allocated_ += num_bytes;
   return Status::OK();
 }
 
-void TmpFileMgr::File::ReportIOError(const ErrorMsg& msg) {
+int TmpFileMgr::File::AssignDiskQueue() const {
+  return file_group_->io_mgr_->AssignQueue(path_.c_str(), disk_id_, false);
+}
+
+void TmpFileMgr::File::Blacklist(const ErrorMsg& msg) {
   LOG(ERROR) << "Error for temporary file '" << path_ << "': " << msg.msg();
-  // IMPALA-2305: avoid blacklisting to prevent test failures.
-  // blacklisted_ = true;
-  // mgr_->BlacklistDevice(device_id_);
+  blacklisted_ = true;
 }
 
 Status TmpFileMgr::File::Remove() {
-  if (current_size_ > 0) FileSystemUtil::RemovePaths(vector<string>(1, path_));
+  // Remove the file if present (it may not be present if no writes completed).
+  FileSystemUtil::RemovePaths({path_});
   return Status::OK();
 }
 
-TmpFileMgr::FileGroup::FileGroup(
-    TmpFileMgr* tmp_file_mgr, RuntimeProfile* profile, int64_t bytes_limit)
+string TmpFileMgr::File::DebugString() {
+  return Substitute("File $0 path '$1' device id $2 disk id $3 bytes allocated $4 "
+      "blacklisted $5", this, path_, device_id_, disk_id_, bytes_allocated_,
+      blacklisted_);
+}
+
+TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
+    RuntimeProfile* profile, const TUniqueId& unique_id, int64_t block_size,
+    int64_t bytes_limit)
   : tmp_file_mgr_(tmp_file_mgr),
-    current_bytes_allocated_(0),
+    io_mgr_(io_mgr),
+    io_ctx_(nullptr),
+    unique_id_(unique_id),
+    block_size_(block_size),
     bytes_limit_(bytes_limit),
+    write_counter_(ADD_COUNTER(profile, "ScratchWrites", TUnit::UNIT)),
+    bytes_written_counter_(ADD_COUNTER(profile, "ScratchBytesWritten", TUnit::BYTES)),
+    read_counter_(ADD_COUNTER(profile, "ScratchReads", TUnit::UNIT)),
+    bytes_read_counter_(ADD_COUNTER(profile, "ScratchBytesRead", TUnit::BYTES)),
+    scratch_space_bytes_used_counter_(
+        ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES)),
+    disk_read_timer_(ADD_TIMER(profile, "TotalReadBlockTime")),
+    encryption_timer_(ADD_TIMER(profile, "TotalEncryptionTime")),
+    current_bytes_allocated_(0),
     next_allocation_index_(0) {
-  DCHECK(tmp_file_mgr != NULL);
-  scratch_space_bytes_used_counter_ =
-      ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES);
+  DCHECK_GT(block_size_, 0);
+  DCHECK(tmp_file_mgr != nullptr);
+  io_mgr_->RegisterContext(&io_ctx_, nullptr);
 }
 
-Status TmpFileMgr::FileGroup::CreateFiles(const TUniqueId& query_id) {
+TmpFileMgr::FileGroup::~FileGroup() {
+  DCHECK_EQ(tmp_files_.size(), 0);
+}
+
+Status TmpFileMgr::FileGroup::CreateFiles() {
+  lock_.DCheckLocked();
   DCHECK(tmp_files_.empty());
-  vector<Status> errs;
-  vector<DeviceId> tmp_devices = tmp_file_mgr_->active_tmp_devices();
+  vector<DeviceId> tmp_devices = tmp_file_mgr_->ActiveTmpDevices();
   int files_allocated = 0;
   // Initialize the tmp files and the initial file to use.
   for (int i = 0; i < tmp_devices.size(); ++i) {
-    TmpFileMgr::DeviceId tmp_device_id = tmp_devices[i];
+    TmpFileMgr::DeviceId device_id = tmp_devices[i];
     // It is possible for a device to be blacklisted after it was returned by
-    // active_tmp_devices(), handle this gracefully by skipping devices if NewFile()
+    // ActiveTmpDevices(), handle this gracefully by skipping devices if NewFile()
     // fails.
-    Status status = NewFile(tmp_device_id, query_id);
+    unique_ptr<TmpFileMgr::File> tmp_file;
+    Status status = tmp_file_mgr_->NewFile(this, device_id, &tmp_file);
     if (status.ok()) {
+      tmp_files_.emplace_back(std::move(tmp_file));
       ++files_allocated;
     } else {
-      errs.push_back(std::move(status));
+      scratch_errors_.push_back(std::move(status));
     }
   }
   DCHECK_EQ(tmp_files_.size(), files_allocated);
   if (tmp_files_.size() == 0) {
-    Status err_status("Could not create files in any configured scratch directories "
-        "(--scratch_dirs).");
-    for (Status& err : errs) err_status.MergeStatus(err);
+    // TODO: IMPALA-4697: the merged errors do not show up in the query error log,
+    // so we must point users to the impalad error log.
+    Status err_status(
+        "Could not create files in any configured scratch directories (--scratch_dirs). "
+        "See logs for previous errors that may have caused this.");
+    for (Status& err : scratch_errors_) err_status.MergeStatus(err);
     return err_status;
   }
-
   // Start allocating on a random device to avoid overloading the first device.
   next_allocation_index_ = rand() % tmp_files_.size();
   return Status::OK();
 }
 
-Status TmpFileMgr::FileGroup::NewFile(const DeviceId& device_id,
-    const TUniqueId& query_id, File** new_file) {
-  unique_ptr<TmpFileMgr::File> tmp_file;
-  RETURN_IF_ERROR(tmp_file_mgr_->NewFile(this, device_id, query_id, &tmp_file));
-  if (new_file != NULL) *new_file = tmp_file.get();
-  tmp_files_.emplace_back(std::move(tmp_file));
-  return Status::OK();
-}
-
 void TmpFileMgr::FileGroup::Close() {
-  for (std::unique_ptr<TmpFileMgr::File>& file: tmp_files_) {
+  // Cancel writes before deleting the files, since in-flight writes could re-create
+  // deleted files.
+  if (io_ctx_ != nullptr) io_mgr_->UnregisterContext(io_ctx_);
+  io_ctx_ = nullptr;
+  for (std::unique_ptr<TmpFileMgr::File>& file : tmp_files_) {
     Status status = file->Remove();
     if (!status.ok()) {
-      LOG(WARNING) << "Error removing scratch file '" << file->path() << "': "
-                   << status.msg().msg();
+      LOG(WARNING) << "Error removing scratch file '" << file->path()
+                   << "': " << status.msg().msg();
     }
   }
   tmp_files_.clear();
@@ -325,18 +301,31 @@ void TmpFileMgr::FileGroup::Close() {
 
 Status TmpFileMgr::FileGroup::AllocateSpace(
     int64_t num_bytes, File** tmp_file, int64_t* file_offset) {
-  if (bytes_limit_ != -1 && current_bytes_allocated_ + num_bytes > bytes_limit_) {
+  DCHECK_LE(num_bytes, block_size_);
+  lock_guard<SpinLock> lock(lock_);
+
+  if (!free_ranges_.empty()) {
+    *tmp_file = free_ranges_.back().first;
+    *file_offset = free_ranges_.back().second;
+    free_ranges_.pop_back();
+    return Status::OK();
+  }
+
+  if (bytes_limit_ != -1 && current_bytes_allocated_ + block_size_ > bytes_limit_) {
     return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, bytes_limit_);
   }
-  vector<Status> errs;
+
+  // Lazily create the files on the first write.
+  if (tmp_files_.empty()) RETURN_IF_ERROR(CreateFiles());
+
   // Find the next physical file in round-robin order and allocate a range from it.
   for (int attempt = 0; attempt < tmp_files_.size(); ++attempt) {
     *tmp_file = tmp_files_[next_allocation_index_].get();
     next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size();
     if ((*tmp_file)->is_blacklisted()) continue;
-    Status status = (*tmp_file)->AllocateSpace(num_bytes, file_offset);
+    Status status = (*tmp_file)->AllocateSpace(block_size_, file_offset);
     if (status.ok()) {
-      scratch_space_bytes_used_counter_->Add(num_bytes);
+      scratch_space_bytes_used_counter_->Add(block_size_);
       current_bytes_allocated_ += num_bytes;
       return Status::OK();
     }
@@ -345,12 +334,261 @@ Status TmpFileMgr::FileGroup::AllocateSpace(
     LOG(WARNING) << "Error while allocating range in scratch file '"
                  << (*tmp_file)->path() << "': " << status.msg().msg()
                  << ". Will try another scratch file.";
-    errs.push_back(status);
+    scratch_errors_.push_back(status);
   }
-  Status err_status("No usable scratch files: space could not be allocated in any "
-                    "of the configured scratch directories (--scratch_dirs).");
-  for (Status& err : errs) err_status.MergeStatus(err);
+  // TODO: IMPALA-4697: the merged errors do not show up in the query error log,
+  // so we must point users to the impalad error log.
+  Status err_status(
+      "No usable scratch files: space could not be allocated in any of "
+      "the configured scratch directories (--scratch_dirs). See logs for previous "
+      "errors that may have caused this.");
+  // Include all previous errors that may have caused the failure.
+  for (Status& err : scratch_errors_) err_status.MergeStatus(err);
   return err_status;
 }
 
+void TmpFileMgr::FileGroup::AddFreeRange(File* file, int64_t offset) {
+  lock_guard<SpinLock> lock(lock_);
+  free_ranges_.emplace_back(file, offset);
+}
+
+Status TmpFileMgr::FileGroup::Write(
+    MemRange buffer, WriteDoneCallback cb, unique_ptr<TmpFileMgr::WriteHandle>* handle) {
+  DCHECK_GE(buffer.len(), 0);
+
+  File* tmp_file;
+  int64_t file_offset;
+  RETURN_IF_ERROR(AllocateSpace(buffer.len(), &tmp_file, &file_offset));
+
+  unique_ptr<WriteHandle> tmp_handle(new WriteHandle(encryption_timer_, cb));
+  WriteHandle* tmp_handle_ptr = tmp_handle.get(); // Pass ptr by value into lambda.
+  DiskIoMgr::WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr](
+      const Status& write_status) { WriteComplete(tmp_handle_ptr, write_status); };
+  RETURN_IF_ERROR(
+      tmp_handle->Write(io_mgr_, io_ctx_, tmp_file, file_offset, buffer, callback));
+  write_counter_->Add(1);
+  bytes_written_counter_->Add(buffer.len());
+  *handle = move(tmp_handle);
+  return Status::OK();
+}
+
+Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) {
+  DCHECK(handle->write_range_ != nullptr);
+  DCHECK(!handle->is_cancelled_);
+  DCHECK_EQ(buffer.len(), handle->len());
+
+  // Don't grab 'lock_' in this method - it is not necessary because we don't touch
+  // any members that it protects and could block other threads for the duration of
+  // the synchronous read.
+  DCHECK(!handle->write_in_flight_);
+  DCHECK(handle->write_range_ != nullptr);
+  // Don't grab handle->lock_, it is safe to touch all of handle's state since the
+  // write is not in flight.
+  DiskIoMgr::ScanRange* scan_range = scan_range_pool_.Add(new DiskIoMgr::ScanRange);
+  scan_range->Reset(nullptr, handle->write_range_->file(), handle->write_range_->len(),
+      handle->write_range_->offset(), handle->write_range_->disk_id(), false,
+      DiskIoMgr::BufferOpts::ReadInto(buffer.data(), buffer.len()));
+  DiskIoMgr::BufferDescriptor* io_mgr_buffer;
+  {
+    SCOPED_TIMER(disk_read_timer_);
+    read_counter_->Add(1);
+    bytes_read_counter_->Add(buffer.len());
+    RETURN_IF_ERROR(io_mgr_->Read(io_ctx_, scan_range, &io_mgr_buffer));
+  }
+
+  if (FLAGS_disk_spill_encryption) {
+    RETURN_IF_ERROR(handle->CheckHashAndDecrypt(buffer));
+  }
+
+  DCHECK_EQ(io_mgr_buffer->buffer(), buffer.data());
+  DCHECK_EQ(io_mgr_buffer->len(), buffer.len());
+  DCHECK(io_mgr_buffer->eosr());
+  io_mgr_buffer->Return();
+  return Status::OK();
+}
+
+Status TmpFileMgr::FileGroup::CancelWriteAndRestoreData(
+    unique_ptr<WriteHandle> handle, MemRange buffer) {
+  DCHECK_EQ(handle->write_range_->data(), buffer.data());
+  DCHECK_EQ(handle->len(), buffer.len());
+  handle->Cancel();
+
+  // Decrypt regardless of whether the write is still in flight or not. An in-flight
+  // write may write bogus data to disk but this lets us get some work done while the
+  // write is being cancelled.
+  Status status;
+  if (FLAGS_disk_spill_encryption) {
+    status = handle->CheckHashAndDecrypt(buffer);
+  }
+  handle->WaitForWrite();
+  AddFreeRange(handle->file_, handle->write_range_->offset());
+  handle.reset();
+  return status;
+}
+
+void TmpFileMgr::FileGroup::DestroyWriteHandle(unique_ptr<WriteHandle> handle) {
+  handle->Cancel();
+  handle->WaitForWrite();
+  AddFreeRange(handle->file_, handle->write_range_->offset());
+  handle.reset();
+}
+
+void TmpFileMgr::FileGroup::WriteComplete(
+    WriteHandle* handle, const Status& write_status) {
+  Status status;
+  if (!write_status.ok()) {
+    status = RecoverWriteError(handle, write_status);
+    if (status.ok()) return;
+  } else {
+    status = write_status;
+  }
+  handle->WriteComplete(status);
+}
+
+Status TmpFileMgr::FileGroup::RecoverWriteError(
+    WriteHandle* handle, const Status& write_status) {
+  DCHECK(!write_status.ok());
+  DCHECK(handle->file_ != nullptr);
+
+  // We can't recover from cancellation or memory limit exceeded.
+  if (write_status.IsCancelled() || write_status.IsMemLimitExceeded()) {
+    return write_status;
+  }
+
+  // Save and report the error before retrying so that the failure isn't silent.
+  {
+    lock_guard<SpinLock> lock(lock_);
+    scratch_errors_.push_back(write_status);
+  }
+  handle->file_->Blacklist(write_status.msg());
+
+  // Do not retry cancelled writes or propagate the error, simply return CANCELLED.
+  if (handle->is_cancelled_) return Status::CANCELLED;
+
+  TmpFileMgr::File* tmp_file;
+  int64_t file_offset;
+  // Discard the scratch file range - we will not reuse ranges from a bad file.
+  // Choose another file to try. Blacklisting ensures we don't retry the same file.
+  // If this fails, the status will include all the errors in 'scratch_errors_'.
+  RETURN_IF_ERROR(AllocateSpace(handle->len(), &tmp_file, &file_offset));
+  return handle->RetryWrite(io_mgr_, io_ctx_, tmp_file, file_offset);
+}
+
+string TmpFileMgr::FileGroup::DebugString() {
+  lock_guard<SpinLock> lock(lock_);
+  stringstream ss;
+  ss << "FileGroup " << this << " block size " << block_size_
+     << " bytes limit " << bytes_limit_
+     << " current bytes allocated " << current_bytes_allocated_
+     << " next allocation index " << next_allocation_index_
+     << " writes " << write_counter_->value()
+     << " bytes written " << bytes_written_counter_->value()
+     << " reads " << read_counter_->value()
+     << " bytes read " << bytes_read_counter_->value()
+     << " scratch bytes used " << scratch_space_bytes_used_counter_
+     << " dist read timer " << disk_read_timer_->value()
+     << " encryption timer " << encryption_timer_->value() << endl
+     << "  " << tmp_files_.size() << " files:" << endl;
+  for (unique_ptr<File>& file : tmp_files_) {
+    ss << "    " << file->DebugString() << endl;
+  }
+  return ss.str();
+}
+
+TmpFileMgr::WriteHandle::WriteHandle(
+    RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb)
+  : cb_(cb),
+    encryption_timer_(encryption_timer),
+    file_(nullptr),
+    is_cancelled_(false),
+    write_in_flight_(false) {}
+
+string TmpFileMgr::WriteHandle::TmpFilePath() const {
+  if (file_ == nullptr) return "";
+  return file_->path();
+}
+
+Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx,
+    File* file, int64_t offset, MemRange buffer,
+    DiskIoMgr::WriteRange::WriteDoneCallback callback) {
+  DCHECK(!write_in_flight_);
+
+  if (FLAGS_disk_spill_encryption) RETURN_IF_ERROR(EncryptAndHash(buffer));
+
+  file_ = file;
+  write_in_flight_ = true;
+  write_range_.reset(
+      new DiskIoMgr::WriteRange(file->path(), offset, file->AssignDiskQueue(), callback));
+  write_range_->SetData(buffer.data(), buffer.len());
+  return io_mgr->AddWriteRange(io_ctx, write_range_.get());
+}
+
+Status TmpFileMgr::WriteHandle::RetryWrite(
+    DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset) {
+  DCHECK(write_in_flight_);
+  file_ = file;
+  write_in_flight_ = true;
+  write_range_->SetRange(file->path(), offset, file->AssignDiskQueue());
+  return io_mgr->AddWriteRange(io_ctx, write_range_.get());
+}
+
+void TmpFileMgr::WriteHandle::WriteComplete(const Status& write_status) {
+  WriteDoneCallback cb;
+  {
+    lock_guard<mutex> lock(write_state_lock_);
+    DCHECK(write_in_flight_);
+    write_in_flight_ = false;
+    // Need to extract 'cb_' because once 'write_in_flight_' is false, the WriteHandle
+    // may be destroyed.
+    cb = move(cb_);
+  }
+  write_complete_cv_.NotifyAll();
+  // Call 'cb' once we've updated the state. We must do this last because once 'cb' is
+  // called, it is valid to call Read() on the handle.
+  cb(write_status);
+}
+
+void TmpFileMgr::WriteHandle::Cancel() {
+  unique_lock<mutex> lock(write_state_lock_);
+  is_cancelled_ = true;
+  // TODO: in future, if DiskIoMgr supported cancellation, we could cancel it here.
+}
+
+void TmpFileMgr::WriteHandle::WaitForWrite() {
+  unique_lock<mutex> lock(write_state_lock_);
+  while (write_in_flight_) write_complete_cv_.Wait(lock);
+}
+
+Status TmpFileMgr::WriteHandle::EncryptAndHash(MemRange buffer) {
+  DCHECK(FLAGS_disk_spill_encryption);
+  SCOPED_TIMER(encryption_timer_);
+  // Since we're using AES-CFB mode, we must take care not to reuse a key/IV pair.
+  // Regenerate a new key and IV for every data buffer we write.
+  key_.InitializeRandom();
+  RETURN_IF_ERROR(key_.Encrypt(buffer.data(), buffer.len(), buffer.data()));
+  hash_.Compute(buffer.data(), buffer.len());
+  return Status::OK();
+}
+
+Status TmpFileMgr::WriteHandle::CheckHashAndDecrypt(MemRange buffer) {
+  DCHECK(FLAGS_disk_spill_encryption);
+  SCOPED_TIMER(encryption_timer_);
+  if (!hash_.Verify(buffer.data(), buffer.len())) {
+    return Status("Block verification failure");
+  }
+  return key_.Decrypt(buffer.data(), buffer.len(), buffer.data());
+}
+
+string TmpFileMgr::WriteHandle::DebugString() {
+  unique_lock<mutex> lock(write_state_lock_);
+  stringstream ss;
+  ss << "Write handle " << this << " file '" << file_->path() << "'"
+     << " is cancelled " << is_cancelled_ << " write in flight " << write_in_flight_;
+  if (write_range_ != NULL) {
+    ss << " data " << write_range_->data() << " len " << write_range_->len()
+       << " file offset " << write_range_->offset()
+       << " disk id " << write_range_->disk_id();
+  }
+  return ss.str();
+}
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 3c489b2..0c3e974 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -18,146 +18,211 @@
 #ifndef IMPALA_RUNTIME_TMP_FILE_MGR_H
 #define IMPALA_RUNTIME_TMP_FILE_MGR_H
 
+#include <functional>
+#include <utility>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "common/object-pool.h"
 #include "common/status.h"
 #include "gen-cpp/Types_types.h" // for TUniqueId
+#include "runtime/disk-io-mgr.h"
+#include "util/mem-range.h"
 #include "util/collection-metrics.h"
+#include "util/condition-variable.h"
+#include "util/openssl-util.h"
 #include "util/runtime-profile.h"
 #include "util/spinlock.h"
 
 namespace impala {
 
-/// TmpFileMgr creates and manages temporary files and directories on the local
-/// filesystem. It can manage multiple temporary directories across multiple devices.
-/// TmpFileMgr ensures that at most one directory per device is used unless overridden
-/// for testing.
+/// TmpFileMgr provides an abstraction for management of temporary (a.k.a. scratch) files
+/// on the filesystem and I/O to and from them. TmpFileMgr manages multiple scratch
+/// directories across multiple devices, configured via the --scratch_dirs option.
+/// TmpFileMgr manages I/O to scratch files in order to abstract away details of which
+/// files are allocated and recovery from certain I/O errors. I/O is done via DiskIoMgr.
+/// TmpFileMgr encrypts data written to disk if enabled by the --disk_spill_encryption
+/// command-line flag.
+///
+/// FileGroups manage scratch space across multiple devices. To write to scratch space,
+/// first a FileGroup is created, then FileGroup::Write() is called to asynchronously
+/// write a memory buffer to one of the scratch files. FileGroup::Write() returns a
+/// WriteHandle, which is used by the caller to identify that write operation. The
+/// caller is notified when the asynchronous write completes via a callback, after which
+/// the caller can use the WriteHandle to read back the data.
 ///
-/// Every temporary File belongs to a FileGroup: to allocate temporary files, first a
-/// FileGroup is created, then FileGroup::NewFile() is called to create a new File with
-/// a unique filename on the specified temporary device. The client can use the File
-/// handle to allocate space in the file. FileGroups can be created with a limit on
-/// the total number of bytes allocated across all files in the group.
+/// Each WriteHandle is backed by a range of data in a scratch file. The first call to
+/// Write() will create files for the FileGroup with unique filenames on the configured
+/// temporary devices. At most one directory per device is used (unless overridden for
+/// testing). Free space is managed within a FileGroup: once a WriteHandle is destroyed,
+/// the file range backing it can be recycled for a different WriteHandle. The file range
+/// of a WriteHandle can be replaced with a different one if a write error is encountered
+/// and the data instead needs to be written to a different disk.
 ///
-/// TODO: we could notify block managers about the failure so they can more take
-/// proactive action to avoid using the device.
+/// Resource Management:
+/// TmpFileMgr provides some basic support for managing local disk space consumption.
+/// A FileGroup can be created with a limit on the total number of bytes allocated across
+/// all files. Writes that would exceed the limit fail with an error status.
+///
+/// TODO: each FileGroup can manage only fixed length scratch file ranges of 'block_size',
+/// to simplify the recycling logic. BufferPool will require variable length ranges.
+/// TODO: IMPALA-4683: we could implement smarter handling of failures, e.g. to
+/// temporarily blacklist devices that show I/O errors.
 class TmpFileMgr {
  public:
-  class FileGroup;
+  class File; // Needs to be public for TmpFileMgrTest.
+  class WriteHandle;
 
-  /// DeviceId is a unique identifier for a temporary device managed by TmpFileMgr.
-  /// It is used as a handle for external classes to identify devices.
+  /// DeviceId is an internal unique identifier for a temporary device managed by
+  /// TmpFileMgr. DeviceIds in the range [0, num tmp devices) are allocated arbitrarily.
+  /// Needs to be public for TmpFileMgrTest.
   typedef int DeviceId;
 
-  /// File is a handle to a physical file in a temporary directory. Clients
-  /// can allocate file space and remove files using AllocateSpace() and Remove().
-  /// Creation of the file is deferred until the first call to AllocateSpace().
-  class File {
+  typedef std::function<void(const Status&)> WriteDoneCallback;
+
+  /// Represents a group of temporary files - one per disk with a scratch directory. The
+  /// total allocated bytes of the group can be bound by setting the space allocation
+  /// limit. The owner of the FileGroup object is responsible for calling the Close()
+  /// method to delete all the files in the group.
+  ///
+  /// Public methods of FileGroup and WriteHandle are safe to call concurrently from
+  /// multiple threads as long as different WriteHandle arguments are provided.
+  class FileGroup {
    public:
-    /// Called to notify TmpFileMgr that an IO error was encountered for this file
-    void ReportIOError(const ErrorMsg& msg);
+    /// Initialize a new file group, which will create files using 'tmp_file_mgr'
+    /// and perform I/O using 'io_mgr'. Adds counters to 'profile' to track scratch
+    /// space used. 'unique_id' is a unique ID that is used to prefix any scratch file
+    /// names. It is an error to create multiple FileGroups with the same 'unique_id'.
+    /// 'block_size' is the size of blocks in bytes that space will be allocated in.
+    /// 'bytes_limit' is the limit on the total file space to allocate.
+    FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, RuntimeProfile* profile,
+        const TUniqueId& unique_id, int64_t block_size, int64_t bytes_limit = -1);
+
+    ~FileGroup();
+
+    /// Asynchronously writes 'buffer' to a temporary file of this file group. If there
+    /// are multiple scratch files, this can write to any of them, and will attempt to
+    /// recover from I/O errors on one file by writing to a different file. The memory
+    /// referenced by 'buffer' must remain valid until the write completes. The callee
+    /// may rewrite the data in 'buffer' in-place (e.g. to do in-place encryption or
+    /// compression). The caller should not modify the data in 'buffer' until the write
+    /// completes or is cancelled, otherwise invalid data may be written to disk.
+    ///
+    /// TODO: buffer->len must be <= 'block_size' until FileGroup supports allocating
+    /// variable-length scratch files ranges.
+    ///
+    /// Returns an error if the scratch space cannot be allocated or the write cannot
+    /// be started. Otherwise 'handle' is set and 'cb' will be called asynchronously from
+    /// a different thread when the write completes successfully or unsuccessfully or is
+    /// cancelled.
+    ///
+    /// 'handle' must be destroyed by passing the DestroyWriteHandle() or
+    /// CancelWriteAndRestoreData().
+    Status Write(
+        MemRange buffer, WriteDoneCallback cb, std::unique_ptr<WriteHandle>* handle);
+
+    /// Synchronously read the data referenced by 'handle' from the temporary file into
+    /// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called
+    /// after a write successfully completes.
+    Status Read(WriteHandle* handle, MemRange buffer);
+
+    /// Cancels the write referenced by 'handle' and destroy associate resources. Also
+    /// restore the original data in the 'buffer' passed to Write(), decrypting or
+    /// decompressing as necessary. The cancellation always succeeds, but an error
+    /// is returned if restoring the data fails.
+    Status CancelWriteAndRestoreData(
+        std::unique_ptr<WriteHandle> handle, MemRange buffer);
+
+    /// Wait for the in-flight I/Os to complete and destroy resources associated with
+    /// 'handle'.
+    void DestroyWriteHandle(std::unique_ptr<WriteHandle> handle);
+
+    /// Calls Remove() on all the files in the group and deletes them.
+    void Close();
+
+    std::string DebugString();
 
-    const std::string& path() const { return path_; }
-    int disk_id() const { return disk_id_; }
-    bool is_blacklisted() const { return blacklisted_; }
+    const TUniqueId& unique_id() const { return unique_id_; }
 
    private:
-    friend class FileGroup;
-    friend class TmpFileMgr;
+    friend class File;
     friend class TmpFileMgrTest;
 
-    /// Allocates 'num_bytes' bytes in this file for a new block of data.
-    /// The file size is increased by a call to truncate() if necessary.
-    /// The physical file is created on the first call to AllocateSpace().
-    /// Returns Status::OK() and sets offset on success.
-    /// Returns an error status if an unexpected error occurs, e.g. the file could not
-    /// be created.
-    Status AllocateSpace(int64_t num_bytes, int64_t* offset);
-
-    /// Delete the physical file on disk, if one was created.
-    /// It is not valid to read or write to a file after calling Remove().
-    Status Remove();
+    /// Initializes the file group with one temporary file per disk with a scratch
+    /// directory. Returns OK if at least one temporary file could be created.
+    /// Returns an error if no temporary files were successfully created. Must only be
+    /// called once. Must be called with 'lock_' held.
+    Status CreateFiles();
 
-    /// The name of the sub-directory that Impala created within each configured scratch
-    /// directory.
-    const static std::string TMP_SUB_DIR_NAME;
+    /// Allocate 'num_bytes' bytes in a temporary file. Try multiple disks if error
+    /// occurs. Returns an error only if no temporary files are usable or the scratch
+    /// limit is exceeded. Must be called without 'lock_' held.
+    Status AllocateSpace(int64_t num_bytes, File** tmp_file, int64_t* file_offset);
 
-    /// Space (in MB) that must ideally be available for writing on a scratch
-    /// directory. A warning is issued if available space is less than this threshold.
-    const static uint64_t AVAILABLE_SPACE_THRESHOLD_MB;
+    /// Add a free scratch range to 'free_ranges_'. Must be called without 'lock_' held.
+    void AddFreeRange(File* file, int64_t offset);
 
-    File(TmpFileMgr* mgr, FileGroup* file_group, DeviceId device_id,
-        const std::string& path);
+    /// Called when the DiskIoMgr write completes for 'handle'. On error, will attempt
+    /// to retry the write. On success or if the write can't be retried, calls
+    /// handle->WriteComplete().
+    void WriteComplete(WriteHandle* handle, const Status& write_status);
 
-    /// TmpFileMgr this belongs to.
-    TmpFileMgr* mgr_;
+    /// Handles a write error. Logs the write error and blacklists the device for this
+    /// file group if the cause was an I/O error. Blacklisting limits the number of times
+    /// a write is retried because each device will only be tried once. Returns OK if it
+    /// successfully reissued the write. Returns an error status if the original error
+    /// was unrecoverable or an unrecoverable error is encountered when reissuing the
+    /// write. The error status will include all previous I/O errors in its details.
+    Status RecoverWriteError(WriteHandle* handle, const Status& write_status);
 
-    /// The FileGroup this belongs to. Cannot be null.
-    FileGroup* file_group_;
+    /// The TmpFileMgr it is associated with.
+    TmpFileMgr* const tmp_file_mgr_;
 
-    /// Path of the physical file in the filesystem.
-    std::string path_;
+    /// DiskIoMgr used for all I/O to temporary files.
+    DiskIoMgr* const io_mgr_;
 
-    /// The temporary device this file is stored on.
-    DeviceId device_id_;
+    /// I/O context used for all reads and writes. Registered in constructor.
+    DiskIoRequestContext* io_ctx_;
 
-    /// The id of the disk on which the physical file lies.
-    int disk_id_;
+    /// Stores scan ranges allocated in Read(). Needed because ScanRange objects may be
+    /// touched by DiskIoMgr even after the scan is finished.
+    /// TODO: IMPALA-4249: remove once lifetime of ScanRange objects is better defined.
+    ObjectPool scan_range_pool_;
 
-    /// Current file size. Modified by AllocateSpace(). Size is 0 before file creation.
-    int64_t current_size_;
+    /// Unique across all FileGroups. Used to prefix file names.
+    const TUniqueId unique_id_;
 
-    /// Set to true to indicate that file can't be expanded. This is useful to keep here
-    /// even though it is redundant with the global per-device blacklisting in TmpFileMgr
-    /// because it can be checked without acquiring a global lock. If a file is
-    /// blacklisted, the corresponding device will always be blacklisted.
-    bool blacklisted_;
-  };
+    /// Size of the blocks in bytes that scratch space is managed in.
+    /// TODO: support variable-length scratch file ranges.
+    const int64_t block_size_;
 
-  /// Represents a group of temporary files - one per disk with a scratch directory. The
-  /// total allocated bytes of the group can be bound by setting the space allocation
-  /// limit. The owner of the FileGroup object is responsible for calling the Close()
-  /// method to delete all the files in the group.
-  class FileGroup {
-   public:
-    /// Initialize a new file group, which will create files using 'tmp_file_mgr'.
-    /// Adds counters to 'profile' to track scratch space used. 'bytes_limit' is
-    /// the limit on the total file space to allocate.
-    FileGroup(
-        TmpFileMgr* tmp_file_mgr, RuntimeProfile* profile, int64_t bytes_limit = -1);
+    /// Max write space allowed (-1 means no limit).
+    const int64_t bytes_limit_;
 
-    ~FileGroup() { DCHECK_EQ(NumFiles(), 0); }
+    /// Number of write operations (includes writes started but not yet complete).
+    RuntimeProfile::Counter* const write_counter_;
 
-    /// Initializes the file group with one temporary file per disk with a scratch
-    /// directory. 'unique_id' is a unique ID that should be used to prefix any
-    /// scratch file names. It is an error to create multiple FileGroups with the
-    /// same 'unique_id'. Returns OK if at least one temporary file could be created.
-    /// Returns an error if no temporary files were successfully created. Must only be
-    /// called once.
-    Status CreateFiles(const TUniqueId& unique_id);
+    /// Number of bytes written to disk (includes writes started but not yet complete).
+    RuntimeProfile::Counter* const bytes_written_counter_;
 
-    /// Allocate num_bytes bytes in a temporary file. Try multiple disks if error occurs.
-    /// Returns an error only if no temporary files are usable or the scratch limit is
-    /// exceeded.
-    Status AllocateSpace(int64_t num_bytes, File** tmp_file, int64_t* file_offset);
+    /// Number of read operations (includes reads started but not yet complete).
+    RuntimeProfile::Counter* const read_counter_;
 
-    /// Calls Remove() on all the files in the group and deletes them.
-    void Close();
+    /// Number of bytes read from disk (includes reads started but not yet complete).
+    RuntimeProfile::Counter* const bytes_read_counter_;
 
-    /// Returns the number of files that are a part of the group.
-    int NumFiles() { return tmp_files_.size(); }
+    /// Amount of scratch space allocated in bytes.
+    RuntimeProfile::Counter* const scratch_space_bytes_used_counter_;
 
-   private:
-    friend class TmpFileMgrTest;
+    /// Time taken for disk reads.
+    RuntimeProfile::Counter* const disk_read_timer_;
 
-    /// Creates a new File with a unique path for a query instance, adds it to the
-    /// group and returns a handle for that file. The file path is within the (single)
-    /// tmp directory on the specified device id.
-    /// If an error is encountered, e.g. the device is blacklisted, the file is not
-    /// added to this group and a non-ok status is returned.
-    Status NewFile(
-        const DeviceId& device_id, const TUniqueId& unique_id, File** new_file = NULL);
+    /// Time spent in disk spill encryption, decryption, and integrity checking.
+    RuntimeProfile::Counter* encryption_timer_;
 
-    /// The TmpFileMgr it is associated with.
-    TmpFileMgr* tmp_file_mgr_;
+    /// Protects below members.
+    SpinLock lock_;
 
     /// List of files representing the FileGroup.
     std::vector<std::unique_ptr<File>> tmp_files_;
@@ -165,16 +230,117 @@ class TmpFileMgr {
     /// Total space allocated in this group's files.
     int64_t current_bytes_allocated_;
 
-    /// Max write space allowed (-1 means no limit).
-    const int64_t bytes_limit_;
-
     /// Index into 'tmp_files' denoting the file to which the next temporary file range
     /// should be allocated from. Used to implement round-robin allocation from temporary
     /// files.
     int next_allocation_index_;
 
-    /// Amount of scratch space allocated in bytes.
-    RuntimeProfile::Counter* scratch_space_bytes_used_counter_;
+    /// List of File/offset pairs for free scratch ranges of size 'block_size_' bytes.
+    std::vector<std::pair<File*, int64_t>> free_ranges_;
+
+    /// Errors encountered when creating/writing scratch files. We store the history so
+    /// that we can report the original cause of the scratch errors if we run out of
+    /// devices to write to.
+    std::vector<Status> scratch_errors_;
+  };
+
+  /// A handle to a write operation, backed by a range of a temporary file. The operation
+  /// is either in-flight or has completed. If it completed with no error and wasn't
+  /// cancelled then the data is in the file and can be read back.
+  ///
+  /// WriteHandle is returned from FileGroup::Write(). After the write completes, the
+  /// handle can be passed to FileGroup::Read() to read back the data zero or more times.
+  /// FileGroup::DestroyWriteHandle() can be called at any time to destroy the handle and
+  /// allow reuse of the scratch file range written to. Alternatively,
+  /// FileGroup::CancelWriteAndRestoreData() can be called to reverse the effects of
+  /// FileGroup::Write() by destroying the handle and restoring the original data to the
+  /// buffer, so long as the data in the buffer was not modified by the caller.
+  ///
+  /// Public methods of WriteHandle are safe to call concurrently from multiple threads.
+  class WriteHandle {
+   public:
+    // The write must be destroyed by FileGroup::DestroyWriteHandle().
+    ~WriteHandle() {
+      DCHECK(!write_in_flight_);
+      DCHECK(is_cancelled_);
+    }
+
+    /// Path of temporary file backing the block. Intended for use in testing.
+    /// Returns empty string if no backing file allocated.
+    std::string TmpFilePath() const;
+
+    /// The length of the write range in bytes.
+    int64_t len() const { return write_range_->len(); }
+
+    std::string DebugString();
+
+   private:
+    friend class FileGroup;
+
+    WriteHandle(RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb);
+
+    /// Starts a write of 'buffer' to 'offset' of 'file'.
+    Status Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file,
+        int64_t offset, MemRange buffer,
+        DiskIoMgr::WriteRange::WriteDoneCallback callback);
+
+    /// Retry the write after the initial write failed with an error, instead writing to
+    /// 'offset' of 'file'.
+    Status RetryWrite(
+        DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset);
+
+    /// Cancels the write asynchronously. After Cancel() is called, writes are not
+    /// retried.
+    void Cancel();
+
+    /// Blocks until the write completes either successfully or unsuccessfully.
+    void WaitForWrite();
+
+    /// Called when the write has completed successfully or not. Sets 'write_in_flight_'
+    /// then calls 'cb_'.
+    void WriteComplete(const Status& write_status);
+
+    /// Encrypts the data in 'buffer' in-place and computes 'hash_'.
+    Status EncryptAndHash(MemRange buffer);
+
+    /// Verifies the integrity hash and decrypts the contents of 'buffer' in place.
+    Status CheckHashAndDecrypt(MemRange buffer);
+
+    /// Callback to be called when the write completes.
+    WriteDoneCallback cb_;
+
+    /// Reference to the FileGroup's 'encryption_timer_'.
+    RuntimeProfile::Counter* encryption_timer_;
+
+    /// The DiskIoMgr write range for this write.
+    boost::scoped_ptr<DiskIoMgr::WriteRange> write_range_;
+
+    /// The temporary file being written to.
+    File* file_;
+
+    /// If --disk_spill_encryption is on, a AES 256-bit key and initialization vector.
+    /// Regenerated for each write.
+    EncryptionKey key_;
+
+    /// If --disk_spill_encryption is on, our hash of the data being written. Filled in
+    /// on writes; verified on reads. This is calculated _after_ encryption.
+    IntegrityHash hash_;
+
+    /// Protects all fields below while 'write_in_flight_' is true. At other times, it is
+    /// invalid to call WriteRange/FileGroup methods concurrently from multiple threads,
+    /// so no locking is required. This is a terminal lock and should not be held while
+    /// acquiring other locks or invoking 'cb_'.
+    boost::mutex write_state_lock_;
+
+    // True if the the write has been cancelled (but is not necessarily complete).
+    bool is_cancelled_;
+
+    // True if a write is in flight.
+    bool write_in_flight_;
+
+    /// Signalled when the write completes and 'write_in_flight_' becomes false, before
+    /// 'cb_' is invoked.
+    ConditionVariable write_complete_cv_;
   };
 
   TmpFileMgr();
@@ -194,59 +360,27 @@ class TmpFileMgr {
 
   /// Total number of devices with tmp directories that are active. There is one tmp
   /// directory per device.
-  int num_active_tmp_devices();
+  int NumActiveTmpDevices();
 
   /// Return vector with device ids of all tmp devices being actively used.
   /// I.e. those that haven't been blacklisted.
-  std::vector<DeviceId> active_tmp_devices();
+  std::vector<DeviceId> ActiveTmpDevices();
 
  private:
-  /// Return a new File handle with a unique path for a query instance. The file is
-  /// associated with the file_group and the file path is within the (single) tmp
+  friend class TmpFileMgrTest;
+
+  /// Return a new File handle with a path based on file_group->unique_id. The file is
+  /// associated with the 'file_group' and the file path is within the (single) scratch
   /// directory on the specified device id. The caller owns the returned handle and is
   /// responsible for deleting it. The file is not created - creation is deferred until
-  /// the first call to File::AllocateSpace().
-  Status NewFile(FileGroup* file_group, const DeviceId& device_id,
-      const TUniqueId& unique_id, std::unique_ptr<File>* new_file);
-
-  /// Dir stores information about a temporary directory.
-  class Dir {
-   public:
-    const std::string& path() const { return path_; }
-
-    // Return true if it was newly added to blacklist.
-    bool blacklist() {
-      bool was_blacklisted = blacklisted_;
-      blacklisted_ = true;
-      return !was_blacklisted;
-    }
-    bool is_blacklisted() const { return blacklisted_; }
-
-   private:
-    friend class TmpFileMgr;
-
-    /// path should be a absolute path to a writable scratch directory.
-    Dir(const std::string& path, bool blacklisted)
-        : path_(path), blacklisted_(blacklisted) {}
-
-    std::string path_;
-
-    bool blacklisted_;
-  };
-
-  /// Remove a device from the rotation. Subsequent attempts to allocate a file on that
-  /// device will fail and the device will not be included in active tmp devices.
-  void BlacklistDevice(DeviceId device_id);
-
-  bool IsBlacklisted(DeviceId device_id);
+  /// the file is written.
+  Status NewFile(
+      FileGroup* file_group, DeviceId device_id, std::unique_ptr<File>* new_file);
 
   bool initialized_;
 
-  /// Protects the status of tmp dirs (i.e. whether they're blacklisted).
-  SpinLock dir_status_lock_;
-
-  /// The created tmp directories.
-  std::vector<Dir> tmp_dirs_;
+  /// The paths of the created tmp directories.
+  std::vector<std::string> tmp_dirs_;
 
   /// Metrics to track active scratch directories.
   IntGauge* num_active_scratch_dirs_metric_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/disk-info.cc
----------------------------------------------------------------------
diff --git a/be/src/util/disk-info.cc b/be/src/util/disk-info.cc
index d3eeb56..eba4f26 100644
--- a/be/src/util/disk-info.cc
+++ b/be/src/util/disk-info.cc
@@ -48,7 +48,6 @@ bool DiskInfo::initialized_;
 vector<DiskInfo::Disk> DiskInfo::disks_;
 map<dev_t, int> DiskInfo::device_id_to_disk_id_;
 map<string, int> DiskInfo::disk_name_to_disk_id_;
-int DiskInfo::num_datanode_dirs_;
 
 // Parses /proc/partitions to get the number of disks.  A bit of looking around
 // seems to indicate this as the best way to do this.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/disk-info.h
----------------------------------------------------------------------
diff --git a/be/src/util/disk-info.h b/be/src/util/disk-info.h
index 4853511..323a265 100644
--- a/be/src/util/disk-info.h
+++ b/be/src/util/disk-info.h
@@ -43,24 +43,8 @@ class DiskInfo {
     return disks_.size();
   }
 
-#if 0
-  /// Returns the number of (logical) disks the data node is using.
-  /// It is possible for this to be more than num_disks since the datanode
-  /// can be configured to have multiple data directories on the same physical
-  /// disk.
-  static int num_datanode_dirs() {
-    DCHECK(initialized_);
-    return num_datanode_dirs_;
-  }
-
-  /// Returns a 0-based disk index for the data node dirs index.
-  static int disk_id(int datanode_dir_idx) {
-    return 0;
-  }
-#endif
-
   /// Returns the 0-based disk index for 'path' (path must be a FS path, not
-  /// hdfs path).
+  /// hdfs path). Returns -1 if the disk index is unknown.
   static int disk_id(const char* path);
 
   /// Returns the device name (e.g. sda) for disk_id
@@ -100,15 +84,11 @@ class DiskInfo {
 
   /// mapping of dev_ts to disk ids
   static std::map<dev_t, int> device_id_to_disk_id_;
-  
+
   /// mapping of devices names to disk ids
   static std::map<std::string, int> disk_name_to_disk_id_;
 
-  static int num_datanode_dirs_;
-
   static void GetDeviceNames();
 };
-
-
 }
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/filesystem-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/filesystem-util.cc b/be/src/util/filesystem-util.cc
index 01cd927..a0cacdf 100644
--- a/be/src/util/filesystem-util.cc
+++ b/be/src/util/filesystem-util.cc
@@ -114,17 +114,6 @@ Status FileSystemUtil::CreateFile(const string& file_path) {
   return Status::OK();
 }
 
-Status FileSystemUtil::ResizeFile(const string& file_path, int64_t trunc_len) {
-  int success = truncate(file_path.c_str(), trunc_len);
-  if (success != 0) {
-    return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR, Substitute(
-        "Truncate file $0 to length $1 failed with errno $2 ($3)",
-        file_path, trunc_len, errno, GetStrErrMsg())));
-  }
-
-  return Status::OK();
-}
-
 Status FileSystemUtil::VerifyIsDirectory(const string& directory_path) {
   error_code errcode;
   bool exists = filesystem::exists(directory_path, errcode);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/filesystem-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/filesystem-util.h b/be/src/util/filesystem-util.h
index 887dc4b..3e824b8 100644
--- a/be/src/util/filesystem-util.h
+++ b/be/src/util/filesystem-util.h
@@ -36,9 +36,6 @@ class FileSystemUtil {
   /// Create a file at the specified path.
   static Status CreateFile(const std::string& file_path);
 
-  /// Resize a file to a specified length - uses unistd truncate().
-  static Status ResizeFile(const std::string& file_path, int64_t trunc_len);
-
   /// Remove the specified paths and their enclosing files/directories.
   static Status RemovePaths(const std::vector<std::string>& directories);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/mem-range.h
----------------------------------------------------------------------
diff --git a/be/src/util/mem-range.h b/be/src/util/mem-range.h
new file mode 100644
index 0000000..c55caaf
--- /dev/null
+++ b/be/src/util/mem-range.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_UTIL_MEM_RANGE_H
+#define IMPALA_UTIL_MEM_RANGE_H
+
+#include <cstdint>
+
+#include "common/logging.h"
+
+namespace impala {
+
+/// Represents a range of memory. This is a convenient alternative to passing around
+/// a separate pointer and length.
+class MemRange {
+ public:
+  MemRange(uint8_t* data, int64_t len) : data_(data), len_(len) {
+    DCHECK_GE(len, 0);
+    DCHECK(len == 0 || data != nullptr);
+  }
+
+  uint8_t* data() const { return data_; }
+  int64_t len() const { return len_; }
+
+  static MemRange null() { return MemRange(nullptr, 0); }
+
+ private:
+  uint8_t* data_;
+  int64_t len_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index bb4251b..5088a1b 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -64,7 +64,7 @@ enum TParquetFallbackSchemaResolution {
 // metadata which overrides everything else.
 struct TQueryOptions {
   1: optional bool abort_on_error = 0
-  2: optional i32 max_errors = 0
+  2: optional i32 max_errors = 100
   3: optional bool disable_codegen = 0
   4: optional i32 batch_size = 0
   5: optional i32 num_nodes = NUM_NODES_ALL

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/testdata/workloads/functional-query/queries/QueryTest/set.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test
index e405caf..1dd1396 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -14,7 +14,7 @@ set
 'EXPLAIN_LEVEL','1'
 'HBASE_CACHE_BLOCKS','0'
 'HBASE_CACHING','0'
-'MAX_ERRORS','0'
+'MAX_ERRORS','100'
 'MAX_IO_BUFFERS','0'
 'MAX_SCAN_RANGE_LENGTH','0'
 'MEM_LIMIT','0'
@@ -46,7 +46,7 @@ set;
 'EXPLAIN_LEVEL','3'
 'HBASE_CACHE_BLOCKS','0'
 'HBASE_CACHING','0'
-'MAX_ERRORS','0'
+'MAX_ERRORS','100'
 'MAX_IO_BUFFERS','0'
 'MAX_SCAN_RANGE_LENGTH','0'
 'MEM_LIMIT','0'
@@ -78,7 +78,7 @@ set;
 'EXPLAIN_LEVEL','0'
 'HBASE_CACHE_BLOCKS','0'
 'HBASE_CACHING','0'
-'MAX_ERRORS','0'
+'MAX_ERRORS','100'
 'MAX_IO_BUFFERS','0'
 'MAX_SCAN_RANGE_LENGTH','0'
 'MEM_LIMIT','0'
@@ -111,7 +111,7 @@ set;
 'EXPLAIN_LEVEL','1'
 'HBASE_CACHE_BLOCKS','0'
 'HBASE_CACHING','0'
-'MAX_ERRORS','0'
+'MAX_ERRORS','100'
 'MAX_IO_BUFFERS','0'
 'MAX_SCAN_RANGE_LENGTH','0'
 'MEM_LIMIT','0'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/tests/custom_cluster/test_scratch_disk.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index 1c02b56..f523dbe 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -52,22 +52,7 @@ class TestScratchDir(CustomClusterTestSuite):
   def get_dirs(dirs):
     return ','.join(dirs)
 
-  @classmethod
-  def setup_class(cls):
-    super(TestScratchDir, cls).setup_class()
-    cls.normal_dirs = cls.generate_dirs(5)
-    cls.non_writable_dirs = cls.generate_dirs(5, writable=False)
-    cls.non_existing_dirs = cls.generate_dirs(5, non_existing=True)
-
-  @classmethod
-  def teardown_class(cls):
-    for dir_path in cls.normal_dirs:
-      shutil.rmtree(dir_path)
-    for dir_path in cls.non_writable_dirs:
-      shutil.rmtree(dir_path)
-
-  @classmethod
-  def generate_dirs(cls, num, writable=True, non_existing=False):
+  def generate_dirs(self, num, writable=True, non_existing=False):
     result = []
     for i in xrange(num):
       dir_path = tempfile.mkdtemp()
@@ -75,27 +60,30 @@ class TestScratchDir(CustomClusterTestSuite):
         shutil.rmtree(dir_path)
       elif not writable:
         os.chmod(dir_path, stat.S_IREAD)
+      if not non_existing:
+        self.created_dirs.append(dir_path)
       result.append(dir_path)
+      print "Generated dir" + dir_path
     return result
 
   def setup_method(self, method):
-    # We are overriding this method to prevent starting Impala before each test. In this
-    # file, each test is responsible for doing that because we want to use class
-    # variables like cls.normal_dirs to generate the parameter string to
-    # start-impala-cluster, which are generated in setup_class (so using the with_args
-    # decorator does not work).
-    pass
+    # Don't call the superclass method to prevent starting Impala before each test. In
+    # this file, each test is responsible for doing that because we want to generate
+    # the parameter string to start-impala-cluster in each test method.
+    self.created_dirs = []
 
   def teardown_method(self, method):
-    pass
+    for dir_path in self.created_dirs:
+      shutil.rmtree(dir_path, ignore_errors=True)
 
   @pytest.mark.execute_serially
   def test_multiple_dirs(self, vector):
     """ 5 empty directories are created in the /tmp directory and we verify that only
         one of those directories is used as scratch disk. Only one should be used as
         scratch because all directories are on same disk."""
+    normal_dirs = self.generate_dirs(5)
     self._start_impala_cluster([
-      '--impalad_args="-scratch_dirs={0}"'.format(','.join(self.normal_dirs))])
+      '--impalad_args="-scratch_dirs={0}"'.format(','.join(normal_dirs))])
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=1)
     exec_option = vector.get_value('exec_option')
@@ -103,10 +91,9 @@ class TestScratchDir(CustomClusterTestSuite):
     impalad = self.cluster.get_any_impalad()
     client = impalad.service.create_beeswax_client()
     self.execute_query_expect_success(client, self.spill_query, exec_option)
-    assert self.count_nonempty_dirs(self.normal_dirs) == 1
+    assert self.count_nonempty_dirs(normal_dirs) == 1
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("-scratch_dirs=")
   def test_no_dirs(self, vector):
     """ Test we can execute a query with no scratch dirs """
     self._start_impala_cluster(['--impalad_args="-scratch_dirs="'])
@@ -124,8 +111,9 @@ class TestScratchDir(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   def test_non_writable_dirs(self, vector):
     """ Test we can execute a query with only bad non-writable scratch """
+    non_writable_dirs = self.generate_dirs(5, writable=False)
     self._start_impala_cluster([
-      '--impalad_args="-scratch_dirs={0}"'.format(','.join(self.non_writable_dirs))])
+      '--impalad_args="-scratch_dirs={0}"'.format(','.join(non_writable_dirs))])
     self.assert_impalad_log_contains("ERROR", "Running without spill to disk: could "
         + "not use any scratch directories in list:.*. See previous "
         + "warnings for information on causes.")
@@ -139,13 +127,14 @@ class TestScratchDir(CustomClusterTestSuite):
     self.execute_query_expect_failure(client, self.spill_query, exec_option)
     # Should be able to execute in-memory query
     self.execute_query_expect_success(client, self.in_mem_query, exec_option)
-    assert self.count_nonempty_dirs(self.non_writable_dirs) == 0
+    assert self.count_nonempty_dirs(non_writable_dirs) == 0
 
   @pytest.mark.execute_serially
   def test_non_existing_dirs(self, vector):
     """ Test that non-existing directories are not created or used """
+    non_existing_dirs = self.generate_dirs(5, non_existing=True)
     self._start_impala_cluster([
-      '--impalad_args="-scratch_dirs={0}"'.format(','.join(self.non_existing_dirs))])
+      '--impalad_args="-scratch_dirs={0}"'.format(','.join(non_existing_dirs))])
     self.assert_impalad_log_contains("ERROR", "Running without spill to disk: could "
         + "not use any scratch directories in list:.*. See previous "
         + "warnings for information on causes.")
@@ -160,4 +149,27 @@ class TestScratchDir(CustomClusterTestSuite):
     self.execute_query_expect_failure(client, self.spill_query, exec_option)
     # Should be able to execute in-memory query
     self.execute_query_expect_success(client, self.in_mem_query, exec_option)
-    assert self.count_nonempty_dirs(self.non_existing_dirs) == 0
+    assert self.count_nonempty_dirs(non_existing_dirs) == 0
+
+  @pytest.mark.execute_serially
+  def test_write_error_failover(self, vector):
+    """ Test that we can fail-over to writable directories if other directories
+        have permissions changed or are removed after impalad startup."""
+    dirs = self.generate_dirs(3);
+    self._start_impala_cluster([
+      '--impalad_args="-scratch_dirs={0}"'.format(','.join(dirs)),
+      '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'])
+    self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+                                    expected_count=len(dirs))
+    exec_option = vector.get_value('exec_option')
+    exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory
+    # Trigger errors when writing the first two directories.
+    shutil.rmtree(dirs[0]) # Remove the first directory.
+    # Make all subdirectories in the second directory non-writable.
+    for dirpath, dirnames, filenames in os.walk(dirs[1]):
+      os.chmod(dirpath, stat.S_IREAD)
+
+    # Should still be able to spill to the third directory.
+    impalad = self.cluster.get_any_impalad()
+    client = impalad.service.create_beeswax_client()
+    self.execute_query_expect_success(client, self.spill_query, exec_option)


[4/5] incubator-impala git commit: IMPALA-3202, IMPALA-2079: rework scratch file I/O

Posted by ta...@apache.org.
IMPALA-3202,IMPALA-2079: rework scratch file I/O

Refactor BufferedBlockMgr/TmpFileMgr to push more I/O logic into
TmpFileMgr, in anticipation of it being shared with BufferPool.
TmpFileMgr now handles:
* Scratch space allocation and recycling
* Read and write I/O

The interface is also greatly changed so that it is built around Write()
and Read() calls, abstracting away the details of temporary file
allocation from clients. This means the TmpFileMgr::File class can
be hidden from clients.

Write error recovery:
Also implement write error recovery in TmpFileMgr.

If an error occurs while writing to scratch and we have multiple
scratch directories, we will try one of the other directories
before cancelling the query. File-level blacklisting is used to
prevent excessive repeated attempts to resize a scratch file during
a single query. Device-level blacklisting is not implemented because
it is problematic to permanently take a scratch directory out of use.

To reduce the number of error paths, all I/O errors are now handled
asynchronously. Previously errors creating or extending the file were
returned synchronously from WriteUnpinnedBlock(). This required
modifying DiskIoMgr to create the file if not present when opened.

Also set the default max_errors value in the thrift definition file,
so that it is in effect for backend tests.

Future Work:
* Support for recycling variable-length scratch file ranges. I omitted
  this to avoid making the patch even large.

Testing:
Updated BufferedBlockMgr unit test to reflect changes in behaviour:
* Scratch space is no longer permanently associated with a block, and
  is remapped every time a new block is written to disk .
* Files are now blacklisted - updated existing tests and enable the
  disable blacklisting test.

Added some basic testing of recycling of scratch file ranges in
the TmpFileMgr unit test.

I also manually tested the code in two ways. First by removing permissions
for /tmp/impala-scratch and ensuring that a spilling query fails cleanly.
Second, by creating a tiny ramdisk (16M) and running with two scratch
directories: one on /tmp and one on the tiny ramdisk. When spilling, an
out of space error is encountered for the tiny ramdisk and impala spills
the remaining data (72M) to /tmp.

Change-Id: I8c9c587df006d2f09d72dd636adafbd295fcdc17
Reviewed-on: http://gerrit.cloudera.org:8080/5141
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/95ed4434
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/95ed4434
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/95ed4434

Branch: refs/heads/master
Commit: 95ed4434f2f446e214934f7dc251b843c1d6b0a6
Parents: 6b90aa3
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Sep 4 10:54:11 2015 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Jan 5 02:26:24 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/buffered-block-mgr-test.cc       | 159 +++---
 be/src/runtime/buffered-block-mgr.cc            | 225 +++------
 be/src/runtime/buffered-block-mgr.h             |  90 ++--
 be/src/runtime/disk-io-mgr-test.cc              |  14 +-
 be/src/runtime/disk-io-mgr.cc                   |  42 +-
 be/src/runtime/disk-io-mgr.h                    |  19 +-
 be/src/runtime/exec-env.cc                      |   8 +-
 be/src/runtime/exec-env.h                       |   2 +-
 be/src/runtime/query-state.cc                   |   2 -
 be/src/runtime/tmp-file-mgr-internal.h          |  93 ++++
 be/src/runtime/tmp-file-mgr-test.cc             | 322 ++++++++----
 be/src/runtime/tmp-file-mgr.cc                  | 504 ++++++++++++++-----
 be/src/runtime/tmp-file-mgr.h                   | 430 ++++++++++------
 be/src/util/disk-info.cc                        |   1 -
 be/src/util/disk-info.h                         |  24 +-
 be/src/util/filesystem-util.cc                  |  11 -
 be/src/util/filesystem-util.h                   |   3 -
 be/src/util/mem-range.h                         |  47 ++
 common/thrift/ImpalaInternalService.thrift      |   2 +-
 .../functional-query/queries/QueryTest/set.test |   8 +-
 tests/custom_cluster/test_scratch_disk.py       |  72 +--
 21 files changed, 1331 insertions(+), 747 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc
index 1828ff8..9b616f5 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -15,13 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <gutil/strings/substitute.h>
-#include <sys/stat.h>
 #include <boost/bind.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/filesystem.hpp>
+#include <boost/regex.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/thread.hpp>
+#include <gutil/strings/substitute.h>
+#include <sys/stat.h>
 
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
@@ -37,6 +38,7 @@
 #include "testutil/gtest-util.h"
 #include "util/cpu-info.h"
 #include "util/disk-info.h"
+#include "util/error-util.h"
 #include "util/filesystem-util.h"
 #include "util/promise.h"
 #include "util/test-info.h"
@@ -49,6 +51,7 @@
 
 using boost::filesystem::directory_iterator;
 using boost::filesystem::remove;
+using boost::regex;
 
 // Note: This is the default scratch dir created by impala.
 // FLAGS_scratch_dirs + TmpFileMgr::TMP_SUB_DIR_NAME.
@@ -100,7 +103,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
       created_tmp_dirs_.push_back(dir);
     }
     test_env_->InitTmpFileMgr(tmp_dirs, false);
-    EXPECT_EQ(num_dirs, test_env_->tmp_file_mgr()->num_active_tmp_devices());
+    EXPECT_EQ(num_dirs, test_env_->tmp_file_mgr()->NumActiveTmpDevices());
     return tmp_dirs;
   }
 
@@ -254,9 +257,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
   }
 
   static bool AllWritesComplete(BufferedBlockMgr* block_mgr) {
-    RuntimeProfile::Counter* writes_outstanding =
-        block_mgr->profile()->GetCounter("BlockWritesOutstanding");
-    return writes_outstanding->value() == 0;
+    return block_mgr->GetNumWritesOutstanding() == 0;
   }
 
   static bool AllWritesComplete(const vector<BufferedBlockMgr*>& block_mgrs) {
@@ -266,13 +267,12 @@ class BufferedBlockMgrTest : public ::testing::Test {
     return true;
   }
 
-  // Delete the temporary file backing a block - all subsequent writes to the file
-  // should fail. Expects backing file has already been allocated.
-  static void DeleteBackingFile(BufferedBlockMgr::Block* block) {
-    const string& path = block->TmpFilePath();
-    ASSERT_GT(path.size(), 0);
-    ASSERT_TRUE(remove(path));
-    LOG(INFO) << "Injected fault by deleting file " << path;
+  // Remove permissions for the temporary file at 'path' - all subsequent writes
+  // to the file should fail. Expects backing file has already been allocated.
+  static void DisableBackingFile(const string& path) {
+    EXPECT_GT(path.size(), 0);
+    EXPECT_EQ(0, chmod(path.c_str(), 0));
+    LOG(INFO) << "Injected fault by removing file permissions " << path;
   }
 
   // Check that the file backing the block has dir as a prefix of its path.
@@ -910,9 +910,11 @@ void BufferedBlockMgrTest::TestRuntimeStateTeardown(
     UnpinBlocks(blocks);
     vector<BufferedBlockMgr::Block*> more_blocks;
     AllocateBlocks(block_mgr.get(), client, max_num_buffers, &more_blocks);
+
+    const string& tmp_file_path = blocks[0]->TmpFilePath();
     DeleteBlocks(more_blocks);
     PinBlocks(blocks);
-    DeleteBackingFile(blocks[0]);
+    DisableBackingFile(tmp_file_path);
   }
 
   // Unpin will initiate writes. If the write error propagates fast enough, some Unpin()
@@ -968,14 +970,15 @@ TEST_F(BufferedBlockMgrTest, WriteCompleteWithCancelledRuntimeState) {
   DeleteBlocks(blocks);
 }
 
-// Clear scratch directory. Return # of files deleted.
-static int clear_scratch_dir() {
+// Remove write permissions on scratch files. Return # of scratch files.
+static int remove_scratch_perms() {
   int num_files = 0;
   directory_iterator dir_it(SCRATCH_DIR);
   for (; dir_it != directory_iterator(); ++dir_it) {
     ++num_files;
-    remove_all(dir_it->path());
+    chmod(dir_it->path().c_str(), 0);
   }
+
   return num_files;
 }
 
@@ -997,7 +1000,7 @@ TEST_F(BufferedBlockMgrTest, WriteError) {
   // Repin the blocks
   PinBlocks(blocks);
   // Remove the backing storage so that future writes will fail
-  int num_files = clear_scratch_dir();
+  int num_files = remove_scratch_perms();
   ASSERT_GT(num_files, 0);
   UnpinBlocks(blocks, true);
   WaitForWrites(block_mgr);
@@ -1024,23 +1027,25 @@ TEST_F(BufferedBlockMgrTest, TmpFileAllocateError) {
   ASSERT_OK(blocks[0]->Unpin());
   WaitForWrites(block_mgr);
   // Remove temporary files - subsequent operations will fail.
-  int num_files = clear_scratch_dir();
-  ASSERT_GT(num_files, 0);
-  // Current implementation will fail here because it tries to expand the tmp file
-  // immediately. This behavior is not contractual but we want to know if it changes
-  // accidentally.
-  Status status = blocks[1]->Unpin();
-  ASSERT_FALSE(status.ok());
+  int num_files = remove_scratch_perms();
+  ASSERT_TRUE(num_files > 0);
+  // Current implementation will not fail here until it attempts to write the file.
+  // This behavior is not contractual but we want to know if it changes accidentally.
+  ASSERT_OK(blocks[1]->Unpin());
+
+  // Write failure should cancel query
+  WaitForWrites(block_mgr);
+  ASSERT_TRUE(block_mgr->IsCancelled());
 
   DeleteBlocks(blocks);
   TearDownMgrs();
 }
 
 // Test that the block manager is able to blacklist a temporary device correctly after a
-// write error. We should not allocate more blocks on that device, but existing blocks
-// on the device will remain in use.
-/// Disabled because blacklisting was disabled as workaround for IMPALA-2305.
-TEST_F(BufferedBlockMgrTest, DISABLED_WriteErrorBlacklist) {
+// write error. The query that encountered the write error should not allocate more
+// blocks on that device, but existing blocks on the device will remain in use and future
+// queries will use the device.
+TEST_F(BufferedBlockMgrTest, WriteErrorBlacklist) {
   // Set up two buffered block managers with two temporary dirs.
   vector<string> tmp_dirs = InitMultipleTmpDirs(2);
   // Simulate two concurrent queries.
@@ -1074,50 +1079,71 @@ TEST_F(BufferedBlockMgrTest, DISABLED_WriteErrorBlacklist) {
   // Delete one file from first scratch dir for first block manager.
   BufferedBlockMgr::Block* error_block = FindBlockForDir(blocks[error_mgr], error_dir);
   ASSERT_TRUE(error_block != NULL) << "Expected a tmp file in dir " << error_dir;
+  const string& error_file_path = error_block->TmpFilePath();
   PinBlocks(all_blocks);
-  DeleteBackingFile(error_block);
-  UnpinBlocks(all_blocks); // Should succeed since tmp file space was already allocated.
+  DisableBackingFile(error_file_path);
+  UnpinBlocks(all_blocks); // Should succeed since writes occur asynchronously
   WaitForWrites(block_mgrs);
-  ASSERT_TRUE(block_mgrs[error_mgr]->IsCancelled());
+  // Both block managers have a usable tmp directory so should still be usable.
+  ASSERT_FALSE(block_mgrs[error_mgr]->IsCancelled());
   ASSERT_FALSE(block_mgrs[no_error_mgr]->IsCancelled());
-  // Temporary device with error should no longer be active.
+  // Temporary device with error should still be active.
   vector<TmpFileMgr::DeviceId> active_tmp_devices =
-      test_env_->tmp_file_mgr()->active_tmp_devices();
-  ASSERT_EQ(tmp_dirs.size() - 1, active_tmp_devices.size());
+      test_env_->tmp_file_mgr()->ActiveTmpDevices();
+  ASSERT_EQ(tmp_dirs.size(), active_tmp_devices.size());
   for (int i = 0; i < active_tmp_devices.size(); ++i) {
     const string& device_path =
         test_env_->tmp_file_mgr()->GetTmpDirPath(active_tmp_devices[i]);
     ASSERT_EQ(string::npos, error_dir.find(device_path));
   }
-  // The second block manager should continue using allocated scratch space, since it
-  // didn't encounter a write error itself. In future this could change but for now it is
-  // the intended behaviour.
+
+  // The error block manager should only allocate from the device that had no error.
+  // The non-error block manager should continue using both devices, since it didn't
+  // encounter a write error itself.
+  vector<BufferedBlockMgr::Block*> error_new_blocks;
+  AllocateBlocks(
+      block_mgrs[error_mgr], clients[error_mgr], blocks_per_mgr, &error_new_blocks);
+  UnpinBlocks(error_new_blocks);
+  WaitForWrites(block_mgrs);
+  EXPECT_TRUE(FindBlockForDir(error_new_blocks, good_dir) != NULL);
+  EXPECT_TRUE(FindBlockForDir(error_new_blocks, error_dir) == NULL);
+  for (int i = 0; i < error_new_blocks.size(); ++i) {
+    LOG(INFO) << "Newly created block backed by file "
+              << error_new_blocks[i]->TmpFilePath();
+    EXPECT_TRUE(BlockInDir(error_new_blocks[i], good_dir));
+  }
+  DeleteBlocks(error_new_blocks);
+
   PinBlocks(blocks[no_error_mgr]);
   UnpinBlocks(blocks[no_error_mgr]);
-  ASSERT_TRUE(FindBlockForDir(blocks[no_error_mgr], good_dir) != NULL);
-  ASSERT_TRUE(FindBlockForDir(blocks[no_error_mgr], error_dir) != NULL);
-  // The second block manager should avoid using bad directory for new blocks.
+  WaitForWrites(block_mgrs);
+  EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], good_dir) != NULL);
+  EXPECT_TRUE(FindBlockForDir(blocks[no_error_mgr], error_dir) != NULL);
+
+  // The second block manager should use the bad directory for new blocks since
+  // blacklisting is per-manager, not global.
   vector<BufferedBlockMgr::Block*> no_error_new_blocks;
   AllocateBlocks(block_mgrs[no_error_mgr], clients[no_error_mgr], blocks_per_mgr,
       &no_error_new_blocks);
   UnpinBlocks(no_error_new_blocks);
-  for (int i = 0; i < no_error_new_blocks.size(); ++i) {
-    LOG(INFO) << "Newly created block backed by file "
-              << no_error_new_blocks[i]->TmpFilePath();
-    ASSERT_TRUE(BlockInDir(no_error_new_blocks[i], good_dir));
-  }
-  // A new block manager should only use the good dir for backing storage.
+  WaitForWrites(block_mgrs);
+  EXPECT_TRUE(FindBlockForDir(no_error_new_blocks, good_dir) != NULL);
+  EXPECT_TRUE(FindBlockForDir(no_error_new_blocks, error_dir) != NULL);
+  DeleteBlocks(no_error_new_blocks);
+
+  // A new block manager should use the both dirs for backing storage.
   BufferedBlockMgr::Client* new_client;
   BufferedBlockMgr* new_block_mgr =
       CreateMgrAndClient(9999, blocks_per_mgr, block_size_, 0, false, &new_client);
   vector<BufferedBlockMgr::Block*> new_mgr_blocks;
   AllocateBlocks(new_block_mgr, new_client, blocks_per_mgr, &new_mgr_blocks);
   UnpinBlocks(new_mgr_blocks);
-  for (int i = 0; i < blocks_per_mgr; ++i) {
-    LOG(INFO) << "New manager Block " << i << " backed by file "
-              << new_mgr_blocks[i]->TmpFilePath();
-    ASSERT_TRUE(BlockInDir(new_mgr_blocks[i], good_dir));
-  }
+  WaitForWrites(block_mgrs);
+  EXPECT_TRUE(FindBlockForDir(new_mgr_blocks, good_dir) != NULL);
+  EXPECT_TRUE(FindBlockForDir(new_mgr_blocks, error_dir) != NULL);
+  DeleteBlocks(new_mgr_blocks);
+
+  DeleteBlocks(all_blocks);
 }
 
 // Check that allocation error resulting from removal of directory results in blocks
@@ -1151,7 +1177,7 @@ TEST_F(BufferedBlockMgrTest, AllocationErrorHandling) {
   // use the good dir.
   UnpinBlocks(blocks[0]);
   // Directories remain on active list even when they experience errors.
-  ASSERT_EQ(2, test_env_->tmp_file_mgr()->num_active_tmp_devices());
+  ASSERT_EQ(2, test_env_->tmp_file_mgr()->NumActiveTmpDevices());
   // Blocks should not be written to bad dir even if it remains non-writable.
   UnpinBlocks(blocks[1]);
   // All writes should succeed.
@@ -1165,18 +1191,39 @@ TEST_F(BufferedBlockMgrTest, AllocationErrorHandling) {
 TEST_F(BufferedBlockMgrTest, NoDirsAllocationError) {
   vector<string> tmp_dirs = InitMultipleTmpDirs(2);
   int max_num_buffers = 2;
+  RuntimeState* runtime_state;
   BufferedBlockMgr::Client* client;
-  BufferedBlockMgr* block_mgr =
-      CreateMgrAndClient(0, max_num_buffers, block_size_, 0, false, &client);
+  BufferedBlockMgr* block_mgr = CreateMgrAndClient(
+      0, max_num_buffers, block_size_, 0, false, &client, &runtime_state);
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
   for (int i = 0; i < tmp_dirs.size(); ++i) {
     const string& tmp_scratch_subdir = tmp_dirs[i] + SCRATCH_SUFFIX;
     chmod(tmp_scratch_subdir.c_str(), 0);
   }
+  ErrorLogMap error_log;
+  runtime_state->GetErrors(&error_log);
+  ASSERT_TRUE(error_log.empty());
   for (int i = 0; i < blocks.size(); ++i) {
-    ASSERT_FALSE(blocks[i]->Unpin().ok());
+    // Writes won't fail until the actual I/O is attempted.
+    ASSERT_OK(blocks[i]->Unpin());
   }
+
+  LOG(INFO) << "Waiting for writes.";
+  // Write failure should cancel query.
+  WaitForWrites(block_mgr);
+  LOG(INFO) << "writes done.";
+  ASSERT_TRUE(block_mgr->IsCancelled());
+  runtime_state->GetErrors(&error_log);
+  ASSERT_FALSE(error_log.empty());
+  stringstream error_string;
+  PrintErrorMap(&error_string, error_log);
+  LOG(INFO) << "Errors: " << error_string.str();
+  ASSERT_NE(
+      string::npos, error_string.str().find("No usable scratch files: space could "
+                                            "not be allocated in any of the configured "
+                                            "scratch directories (--scratch_dirs)"))
+      << error_string.str();
   DeleteBlocks(blocks);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index 0c3d25f..d4e14a2 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -29,12 +29,9 @@
 
 #include <gutil/strings/substitute.h>
 
-DEFINE_bool(disk_spill_encryption, false, "Set this to encrypt and perform an integrity "
-  "check on all data spilled to disk during a query");
-
 #include "common/names.h"
 
-using namespace strings;   // for Substitute
+using namespace strings; // for Substitute
 
 namespace impala {
 
@@ -132,11 +129,8 @@ BufferedBlockMgr::Block::Block(BufferedBlockMgr* block_mgr)
   : buffer_desc_(NULL),
     block_mgr_(block_mgr),
     client_(NULL),
-    write_range_(NULL),
-    tmp_file_(NULL),
     valid_data_len_(0),
-    num_rows_(0) {
-}
+    num_rows_(0) {}
 
 Status BufferedBlockMgr::Block::Pin(bool* pinned, Block* release_block, bool unpin) {
   return block_mgr_->PinBlock(this, pinned, release_block, unpin);
@@ -185,8 +179,8 @@ bool BufferedBlockMgr::Block::Validate() const {
 }
 
 string BufferedBlockMgr::Block::TmpFilePath() const {
-  if (tmp_file_ == NULL) return "";
-  return tmp_file_->path();
+  if (write_handle_ == NULL) return "";
+  return write_handle_->TmpFilePath();
 }
 
 string BufferedBlockMgr::Block::DebugString() const {
@@ -200,6 +194,9 @@ string BufferedBlockMgr::Block::DebugString() const {
      << "  Pinned: " << is_pinned_ << endl
      << "  Write Issued: " << in_write_ << endl
      << "  Client Local: " << client_local_ << endl;
+  if (write_handle_ != NULL) {
+    ss << "  Write handle: " << write_handle_->DebugString() << endl;
+  }
   if (client_ != NULL) ss << "  Client: " << client_->DebugString();
   return ss.str();
 }
@@ -208,7 +205,7 @@ BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr
     int64_t block_size, int64_t scratch_limit)
   : max_block_size_(block_size),
     // Keep two writes in flight per scratch disk so the disks can stay busy.
-    block_write_threshold_(tmp_file_mgr->num_active_tmp_devices() * 2),
+    block_write_threshold_(tmp_file_mgr->NumActiveTmpDevices() * 2),
     disable_spill_(state->query_ctx().disable_spilling || block_write_threshold_ == 0
         || scratch_limit == 0),
     query_id_(state->query_id()),
@@ -217,7 +214,6 @@ BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr
     total_pinned_buffers_(0),
     non_local_outstanding_writes_(0),
     tmp_file_group_(NULL),
-    io_mgr_(state->io_mgr()),
     is_cancelled_(false),
     writes_issued_(0),
     debug_write_delay_ms_(0) {}
@@ -356,7 +352,7 @@ bool BufferedBlockMgr::ConsumeMemory(Client* client, int64_t size) {
   // If we either couldn't acquire enough buffers or WriteUnpinnedBlocks() failed, undo
   // the reservation.
   if (buffers_acquired != buffers_needed || !status.ok()) {
-    if (!status.ok()) {
+    if (!status.ok() && !status.IsCancelled()) {
       VLOG_QUERY << "Query: " << query_id_ << " write unpinned buffers failed.";
       client->state_->LogError(status.msg());
     }
@@ -388,8 +384,6 @@ void BufferedBlockMgr::Cancel() {
     if (is_cancelled_) return;
     is_cancelled_ = true;
   }
-  // Cancel the underlying io mgr to unblock any waiting threads.
-  io_mgr_->CancelContext(io_request_context_);
 }
 
 bool BufferedBlockMgr::IsCancelled() {
@@ -548,14 +542,13 @@ BufferedBlockMgr::~BufferedBlockMgr() {
   // Do not do that with 'static_block_mgrs_lock_' held.
   other_mgr_ptr.reset();
 
-  if (io_request_context_ != NULL) io_mgr_->UnregisterContext(io_request_context_);
+  // Delete tmp files and cancel any in-flight writes.
+  tmp_file_group_->Close();
 
   // If there are any outstanding writes and we are here it means that when the
   // WriteComplete() callback gets executed it is going to access invalid memory.
   // See IMPALA-1890.
   DCHECK_EQ(non_local_outstanding_writes_, 0) << endl << DebugInternal();
-  // Delete tmp files.
-  tmp_file_group_->Close();
 
   // Validate that clients deleted all of their blocks. Since all writes have
   // completed at this point, any deleted blocks should be in unused_blocks_.
@@ -591,6 +584,13 @@ MemTracker* BufferedBlockMgr::get_tracker(Client* client) const {
   return client->tracker_;
 }
 
+int64_t BufferedBlockMgr::GetNumWritesOutstanding() {
+  // Acquire lock to avoid returning mid-way through WriteComplete() when the
+  // state may be inconsistent.
+  lock_guard<mutex> lock(lock_);
+  return profile()->GetCounter("BlockWritesOutstanding")->value();
+}
+
 Status BufferedBlockMgr::DeleteOrUnpinBlock(Block* block, bool unpin) {
   if (block == NULL) {
     return IsCancelled() ? Status::CANCELLED : Status::OK();
@@ -619,8 +619,10 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* pinned, Block* release_blo
   if (!status.ok()) goto error;
   *pinned = block->is_pinned_;
 
-  // Block was not evicted or had no data, nothing left to do.
-  if (in_mem || block->valid_data_len_ == 0) {
+  if (in_mem) {
+    // The block's buffer is still in memory with the original data.
+    status = CancelWrite(block);
+    if (!status.ok()) goto error;
     return DeleteOrUnpinBlock(release_block, unpin);
   }
 
@@ -628,6 +630,9 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* pinned, Block* release_blo
     if (release_block == NULL) return Status::OK();
 
     if (block->buffer_desc_ != NULL) {
+      // The block's buffer is still in memory but we couldn't get an additional buffer
+      // because it would eat into another client's reservation. However, we can use
+      // release_block's reservation, so reclaim the buffer.
       {
         lock_guard<mutex> lock(lock_);
         if (free_io_buffers_.Contains(block->buffer_desc_)) {
@@ -646,9 +651,12 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* pinned, Block* release_blo
         status = WriteUnpinnedBlocks();
         if (!status.ok()) goto error;
       }
+      status = CancelWrite(block);
+      if (!status.ok()) goto error;
       return DeleteOrUnpinBlock(release_block, unpin);
     }
-
+    // FindBufferForBlock() wasn't able to find a buffer so transfer the one from
+    // 'release_block'.
     status = TransferBuffer(block, release_block, unpin);
     if (!status.ok()) goto error;
     DCHECK(!release_block->is_pinned_);
@@ -657,33 +665,14 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* pinned, Block* release_blo
     *pinned = true;
   }
 
-  DCHECK(block->write_range_ != NULL) << block->DebugString() << endl << release_block;
-
-  {
-    // Read the block from disk if it was not in memory.
-    SCOPED_TIMER(disk_read_timer_);
-    // Create a ScanRange to perform the read.
-    DiskIoMgr::ScanRange* scan_range =
-        obj_pool_.Add(new DiskIoMgr::ScanRange());
-    scan_range->Reset(NULL, block->write_range_->file(), block->write_range_->len(),
-        block->write_range_->offset(), block->write_range_->disk_id(), false,
-        DiskIoMgr::BufferOpts::ReadInto(block->buffer(), block->buffer_len()));
-    DiskIoMgr::BufferDescriptor* io_mgr_buffer;
-    status = io_mgr_->Read(io_request_context_, scan_range, &io_mgr_buffer);
-    if (!status.ok()) goto error;
-
-    DCHECK_EQ(io_mgr_buffer->buffer(), block->buffer());
-    DCHECK_EQ(io_mgr_buffer->len(), block->valid_data_len());
-    DCHECK(io_mgr_buffer->eosr());
-    io_mgr_buffer->Return();
-  }
+  DCHECK(block->write_handle_ != NULL) << block->DebugString() << endl << release_block;
 
-  if (FLAGS_disk_spill_encryption) {
-    // Decryption is done in-place, since the buffer can't be accessed by anyone else.
-    status = CheckHashAndDecrypt(block);
+  // The block is on disk - read it back into memory.
+  if (block->valid_data_len() > 0) {
+    status = tmp_file_group_->Read(block->write_handle_.get(), block->valid_data());
     if (!status.ok()) goto error;
   }
-
+  tmp_file_group_->DestroyWriteHandle(move(block->write_handle_));
   return DeleteOrUnpinBlock(release_block, unpin);
 
 error:
@@ -693,6 +682,24 @@ error:
   return status;
 }
 
+Status BufferedBlockMgr::CancelWrite(Block* block) {
+  {
+    unique_lock<mutex> lock(lock_);
+    DCHECK(block->buffer_desc_ != NULL);
+    // If there is an in-flight write, wait for it to finish. This is sub-optimal
+    // compared to just cancelling the write, but reduces the number of possible
+    // code paths in this legacy code.
+    WaitForWrite(lock, block);
+    if (is_cancelled_) return Status::CANCELLED;
+  }
+  if (block->write_handle_ != NULL) {
+    // Restore the in-memory data without reading from disk (e.g. decrypt it).
+    RETURN_IF_ERROR(tmp_file_group_->CancelWriteAndRestoreData(
+        move(block->write_handle_), block->valid_data()));
+  }
+  return Status::OK();
+}
+
 Status BufferedBlockMgr::UnpinBlock(Block* block) {
   DCHECK(!block->is_deleted_) << "Unpin for deleted block.";
 
@@ -738,49 +745,17 @@ Status BufferedBlockMgr::WriteUnpinnedBlock(Block* block) {
   // Assumes block manager lock is already taken.
   DCHECK(!block->is_pinned_) << block->DebugString();
   DCHECK(!block->in_write_) << block->DebugString();
+  DCHECK(block->write_handle_ == NULL) << block->DebugString();
   DCHECK_EQ(block->buffer_desc_->len, max_block_size_);
 
-  if (block->write_range_ == NULL) {
-    if (tmp_file_group_->NumFiles() == 0) {
-      RETURN_IF_ERROR(tmp_file_group_->CreateFiles(query_id_));
-    }
+  // The block is on disk - read it back into memory.
+  RETURN_IF_ERROR(tmp_file_group_->Write(block->valid_data(),
+      [this, block](const Status& write_status) { WriteComplete(block, write_status); },
+      &block->write_handle_));
 
-    // First time the block is being persisted - need to allocate tmp file space.
-    TmpFileMgr::File* tmp_file;
-    int64_t file_offset;
-    RETURN_IF_ERROR(
-        tmp_file_group_->AllocateSpace(max_block_size_, &tmp_file, &file_offset));
-    int disk_id = tmp_file->disk_id();
-    if (disk_id < 0) {
-      // Assign a valid disk id to the write range if the tmp file was not assigned one.
-      static unsigned int next_disk_id = 0;
-      disk_id = ++next_disk_id;
-    }
-    disk_id %= io_mgr_->num_local_disks();
-    DiskIoMgr::WriteRange::WriteDoneCallback callback =
-        bind(mem_fn(&BufferedBlockMgr::WriteComplete), this, block, _1);
-    block->write_range_ = obj_pool_.Add(new DiskIoMgr::WriteRange(
-        tmp_file->path(), file_offset, disk_id, callback));
-    block->tmp_file_ = tmp_file;
-  }
-
-  uint8_t* outbuf = NULL;
-  if (FLAGS_disk_spill_encryption) {
-    // The block->buffer() could be accessed during the write path, so we have to
-    // make a copy of it while writing.
-    RETURN_IF_ERROR(EncryptAndHash(block, &outbuf));
-  } else {
-    outbuf = block->buffer();
-  }
-
-  block->write_range_->SetData(outbuf, block->valid_data_len_);
-
-  // Issue write through DiskIoMgr.
-  RETURN_IF_ERROR(io_mgr_->AddWriteRange(io_request_context_, block->write_range_));
   block->in_write_ = true;
   DCHECK(block->Validate()) << endl << block->DebugString();
   outstanding_writes_counter_->Add(1);
-  bytes_written_counter_->Add(block->valid_data_len_);
   ++writes_issued_;
   if (writes_issued_ == 1) {
     if (ImpaladMetrics::NUM_QUERIES_SPILLED != NULL) {
@@ -805,25 +780,22 @@ void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) {
 #endif
   Status status = Status::OK();
   lock_guard<mutex> lock(lock_);
-  outstanding_writes_counter_->Add(-1);
   DCHECK(Validate()) << endl << DebugInternal();
   DCHECK(is_cancelled_ || block->in_write_) << "WriteComplete() for block not in write."
-                                            << endl << block->DebugString();
+                                            << endl
+                                            << block->DebugString();
   DCHECK(block->buffer_desc_ != NULL);
+
+  outstanding_writes_counter_->Add(-1);
   if (!block->client_local_) {
     DCHECK_GT(non_local_outstanding_writes_, 0) << block->DebugString();
     --non_local_outstanding_writes_;
   }
   block->in_write_ = false;
 
-  // Explicitly release our temporarily allocated buffer here so that it doesn't
-  // hang around needlessly.
-  if (FLAGS_disk_spill_encryption) EncryptedWriteComplete(block);
-
   // ReturnUnusedBlock() will clear the block, so save required state in local vars.
   // state is not valid if the block was deleted because the state may be torn down
   // after the state's fragment has deleted all of its blocks.
-  TmpFileMgr::File* tmp_file = block->tmp_file_;
   RuntimeState* state = block->is_deleted_ ? NULL : block->client_->state_;
 
   // If the block was re-pinned when it was in the IOMgr queue, don't free it.
@@ -847,18 +819,17 @@ void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) {
 
   if (!write_status.ok() || !status.ok() || is_cancelled_) {
     VLOG_FILE << "Query: " << query_id_ << ". Write did not complete successfully: "
-        "write_status=" << write_status.GetDetail() << ", status=" << status.GetDetail()
-        << ". is_cancelled_=" << is_cancelled_;
-
+                                           "write_status="
+              << write_status.GetDetail() << ", status=" << status.GetDetail()
+              << ". is_cancelled_=" << is_cancelled_;
     // If the instance is already cancelled, don't confuse things with these errors.
     if (!write_status.ok() && !write_status.IsCancelled()) {
       // Report but do not attempt to recover from write error.
-      DCHECK(tmp_file != NULL);
-      if (!write_status.IsMemLimitExceeded()) tmp_file->ReportIOError(write_status.msg());
       VLOG_QUERY << "Query: " << query_id_ << " write complete callback with error.";
+
       if (state != NULL) state->LogError(write_status.msg());
     }
-    if (!status.ok()) {
+    if (!status.ok() && !status.IsCancelled()) {
       VLOG_QUERY << "Query: " << query_id_ << " error while writing unpinned blocks.";
       if (state != NULL) state->LogError(status.msg());
     }
@@ -875,6 +846,7 @@ void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) {
   if (!block->client_local_) buffer_available_cv_.notify_all();
   if (block->is_deleted_) {
     // Finish the DeleteBlock() work.
+    tmp_file_group_->DestroyWriteHandle(move(block->write_handle_));
     block->buffer_desc_->block = NULL;
     block->buffer_desc_ = NULL;
     ReturnUnusedBlock(block);
@@ -913,7 +885,9 @@ void BufferedBlockMgr::DeleteBlockLocked(const unique_lock<mutex>& lock, Block*
   if (block->in_write_) {
     DCHECK(block->buffer_desc_ != NULL && block->buffer_desc_->len == max_block_size_)
         << "Should never be writing a small buffer";
-    // If a write is still pending, return. Cleanup will be done in WriteComplete().
+    // If a write is still pending, cancel it and return. Cleanup will be done in
+    // WriteComplete(). Cancelling the write ensures that it won't try to log to the
+    // RuntimeState (which may be torn down before the block manager).
     DCHECK(block->Validate()) << endl << block->DebugString();
     return;
   }
@@ -935,6 +909,12 @@ void BufferedBlockMgr::DeleteBlockLocked(const unique_lock<mutex>& lock, Block*
       block->buffer_desc_ = NULL;
     }
   }
+
+  // Discard any on-disk data. The write is finished so this won't call back into
+  // BufferedBlockMgr.
+  if (block->write_handle_ != NULL) {
+    tmp_file_group_->DestroyWriteHandle(move(block->write_handle_));
+  }
   ReturnUnusedBlock(block);
   DCHECK(block->Validate()) << endl << block->DebugString();
   DCHECK(Validate()) << endl << DebugInternal();
@@ -1224,7 +1204,7 @@ string BufferedBlockMgr::DebugString(Client* client) {
 
 string BufferedBlockMgr::DebugInternal() const {
   stringstream ss;
-  ss << "Buffered block mgr" << endl
+  ss << "Buffered block mgr " << this << endl
      << "  Num writes outstanding: " << outstanding_writes_counter_->value() << endl
      << "  Num free io buffers: " << free_io_buffers_.size() << endl
      << "  Num unpinned blocks: " << unpinned_blocks_.size() << endl
@@ -1234,6 +1214,7 @@ string BufferedBlockMgr::DebugInternal() const {
      << "  Remaining memory: " << mem_tracker_->SpareCapacity()
      << " (#blocks=" << (mem_tracker_->SpareCapacity() / max_block_size_) << ")" << endl
      << "  Block write threshold: " << block_write_threshold_;
+  if (tmp_file_group_ != NULL) ss << tmp_file_group_->DebugString();
   return ss.str();
 }
 
@@ -1243,13 +1224,11 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, TmpFileMgr* tmp_file_mgr,
   unique_lock<mutex> l(lock_);
   if (initialized_) return;
 
-  io_mgr->RegisterContext(&io_request_context_, NULL);
-
   profile_.reset(new RuntimeProfile(&obj_pool_, "BlockMgr"));
   parent_profile->AddChild(profile_.get());
 
-  tmp_file_group_.reset(
-      new TmpFileMgr::FileGroup(tmp_file_mgr, profile_.get(), scratch_limit));
+  tmp_file_group_.reset(new TmpFileMgr::FileGroup(
+      tmp_file_mgr, io_mgr, profile_.get(), query_id_, max_block_size_, scratch_limit));
 
   mem_limit_counter_ = ADD_COUNTER(profile_.get(), "MemoryLimit", TUnit::BYTES);
   mem_limit_counter_->Set(mem_limit);
@@ -1257,13 +1236,10 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, TmpFileMgr* tmp_file_mgr,
   block_size_counter_->Set(max_block_size_);
   created_block_counter_ = ADD_COUNTER(profile_.get(), "BlocksCreated", TUnit::UNIT);
   recycled_blocks_counter_ = ADD_COUNTER(profile_.get(), "BlocksRecycled", TUnit::UNIT);
-  bytes_written_counter_ = ADD_COUNTER(profile_.get(), "BytesWritten", TUnit::BYTES);
   outstanding_writes_counter_ =
       ADD_COUNTER(profile_.get(), "BlockWritesOutstanding", TUnit::UNIT);
   buffered_pin_counter_ = ADD_COUNTER(profile_.get(), "BufferedPins", TUnit::UNIT);
-  disk_read_timer_ = ADD_TIMER(profile_.get(), "TotalReadBlockTime");
   buffer_wait_timer_ = ADD_TIMER(profile_.get(), "TotalBufferWaitTime");
-  encryption_timer_ = ADD_TIMER(profile_.get(), "TotalEncryptionTime");
 
   // Create a new mem_tracker and allocate buffers.
   mem_tracker_.reset(
@@ -1272,45 +1248,4 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, TmpFileMgr* tmp_file_mgr,
   initialized_ = true;
 }
 
-Status BufferedBlockMgr::EncryptAndHash(Block* block, uint8_t** outbuf) {
-  DCHECK(FLAGS_disk_spill_encryption);
-  DCHECK(block->buffer());
-  DCHECK(!block->is_pinned_);
-  DCHECK(!block->in_write_);
-  DCHECK(outbuf);
-  SCOPED_TIMER(encryption_timer_);
-  // Encrypt to a temporary buffer since so that the original data is still in the buffer
-  // if the block is re-pinned while the write is still in-flight.
-  block->encrypted_write_buffer_.reset(new uint8_t[block->valid_data_len_]);
-  // Since we're using AES-CFB mode, we must take care not to reuse a key/IV pair.
-  // Regenerate a new key and IV for every block of data we write, including between
-  // writes of the same Block.
-  block->key_.InitializeRandom();
-  RETURN_IF_ERROR(block->key_.Encrypt(
-      block->buffer(), block->valid_data_len_, block->encrypted_write_buffer_.get()));
-
-  block->hash_.Compute(block->encrypted_write_buffer_.get(), block->valid_data_len_);
-
-  *outbuf = block->encrypted_write_buffer_.get();
-  return Status::OK();
-}
-
-void BufferedBlockMgr::EncryptedWriteComplete(Block* block) {
-  DCHECK(FLAGS_disk_spill_encryption);
-  DCHECK(block->encrypted_write_buffer_.get());
-  block->encrypted_write_buffer_.reset();
-}
-
-Status BufferedBlockMgr::CheckHashAndDecrypt(Block* block) {
-  DCHECK(FLAGS_disk_spill_encryption);
-  DCHECK(block->buffer());
-  SCOPED_TIMER(encryption_timer_);
-
-  if (!block->hash_.Verify(block->buffer(), block->valid_data_len_)) {
-    return Status("Block verification failure");
-  }
-  // Decrypt block->buffer() in-place. Safe because no one is accessing it.
-  return block->key_.Decrypt(block->buffer(), block->valid_data_len_, block->buffer());
-}
-
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/buffered-block-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.h b/be/src/runtime/buffered-block-mgr.h
index 269b707..bbe8429 100644
--- a/be/src/runtime/buffered-block-mgr.h
+++ b/be/src/runtime/buffered-block-mgr.h
@@ -20,7 +20,7 @@
 
 #include "runtime/disk-io-mgr.h"
 #include "runtime/tmp-file-mgr.h"
-#include "util/openssl-util.h"
+#include "util/mem-range.h"
 
 namespace impala {
 
@@ -175,6 +175,13 @@ class BufferedBlockMgr {
       return buffer_desc_->buffer;
     }
 
+    /// Returns a reference to the valid data in the block's buffer. Only guaranteed to
+    /// be valid if the block is pinned.
+    MemRange valid_data() const {
+      DCHECK(buffer_desc_ != NULL);
+      return MemRange(buffer_desc_->buffer, valid_data_len_);
+    }
+
     /// Return the number of bytes allocated in this block.
     int64_t valid_data_len() const { return valid_data_len_; }
 
@@ -223,17 +230,9 @@ class BufferedBlockMgr {
     /// The client that owns this block.
     Client* client_;
 
-    /// WriteRange object representing the on-disk location used to persist a block.
-    /// Is created the first time a block is persisted, and retained until the block
-    /// object is destroyed. The file location and offset in write_range_ are valid
-    /// throughout the lifetime of this object, but the data and length in the
-    /// write_range_ are only valid while the block is being written.
-    /// write_range_ instance is owned by the block manager.
-    DiskIoMgr::WriteRange* write_range_;
-
-    /// The file this block belongs to. The lifetime is the same as the file location
-    /// and offset in write_range_. The File is owned by BufferedBlockMgr, not TmpFileMgr.
-    TmpFileMgr::File* tmp_file_;
+    /// Non-NULL when the block data is written to scratch or is in the process of being
+    /// written.
+    std::unique_ptr<TmpFileMgr::WriteHandle> write_handle_;
 
     /// Length of valid (i.e. allocated) data within the block.
     int64_t valid_data_len_;
@@ -241,20 +240,6 @@ class BufferedBlockMgr {
     /// Number of rows in this block.
     int num_rows_;
 
-    /// If --disk_spill_encryption is on, in the write path we allocate a new buffer to
-    /// hold encrypted data while it's being written to disk. In the read path, we can
-    /// instead decrypt data in place since no one else because the read buffer isn't
-    /// accessible to any other threads until Pin() returns.
-    boost::scoped_array<uint8_t> encrypted_write_buffer_;
-
-    /// If --disk_spill_encryption is on, a AES 256-bit key and initialization vector.
-    /// Regenerated on each write.
-    EncryptionKey key_;
-
-    /// If --disk_spill_encryption is on, our hash of the data being written. Filled in
-    /// on writes; verified on reads. This is calculated _after_ encryption.
-    IntegrityHash hash_;
-
     /// Block state variables. The block's buffer can be freed only if is_pinned_ and
     /// in_write_ are both false.
 
@@ -335,8 +320,8 @@ class BufferedBlockMgr {
   ///   - If there is memory pressure, block will get the buffer from 'unpin_block'.
   Status GetNewBlock(Client* client, Block* unpin_block, Block** block, int64_t len = -1);
 
-  /// Cancels the block mgr. All subsequent calls that return a Status fail with
-  /// Status::CANCELLED. Idempotent.
+  /// Test helper to cancel the block mgr. All subsequent calls that return a Status fail
+  /// with Status::CANCELLED. Idempotent.
   void Cancel();
 
   /// Returns true if the block manager was cancelled.
@@ -360,10 +345,6 @@ class BufferedBlockMgr {
   /// ReleaseMemory() call.
   void ReleaseMemory(Client* client, int64_t size);
 
-  /// The number of buffers available for client. That is, if all other clients were
-  /// stopped, the number of buffers this client could get.
-  int64_t available_buffers(Client* client) const;
-
   /// Returns a MEM_LIMIT_EXCEEDED error which includes the minimum memory required by
   /// this 'client' that acts on behalf of the node with id 'node_id'. 'node_id' is used
   /// only for error reporting.
@@ -381,6 +362,7 @@ class BufferedBlockMgr {
   void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; }
 
  private:
+  friend class BufferedBlockMgrTest;
   friend struct Client;
 
   /// Descriptor for a single memory buffer in the pool.
@@ -415,6 +397,12 @@ class BufferedBlockMgr {
   void DeleteBlock(Block* block);
   void DeleteBlockLocked(const boost::unique_lock<boost::mutex>& lock, Block* block);
 
+  /// If there is an in-flight write, cancel the write and restore the contents of the
+  /// block's buffer. If no write has been started for 'block', does nothing. 'block'
+  /// must have an associated buffer. Returns an error status if an error is encountered
+  /// while cancelling the write or CANCELLED if the block mgr is cancelled.
+  Status CancelWrite(Block* block);
+
   /// If the 'block' is NULL, checks if cancelled and returns. Otherwise, depending on
   /// 'unpin' calls either  DeleteBlock() or UnpinBlock(), which both first check for
   /// cancellation. It should be called without the lock_ acquired.
@@ -428,6 +416,10 @@ class BufferedBlockMgr {
   /// The caller should not hold 'lock_'.
   Status TransferBuffer(Block* dst, Block* src, bool unpin);
 
+  /// The number of buffers available for client. That is, if all other clients were
+  /// stopped, the number of buffers this client could get.
+  int64_t available_buffers(Client* client) const;
+
   /// Returns the total number of unreserved buffers. This is the sum of unpinned,
   /// free and buffers we can still allocate minus the total number of reserved buffers
   /// that are not pinned.
@@ -461,8 +453,8 @@ class BufferedBlockMgr {
   /// Issues the write for this block to the DiskIoMgr.
   Status WriteUnpinnedBlock(Block* block);
 
-  /// Wait until either the write for 'block' completes or the block mgr is cancelled.
-  /// 'lock_' must be held with 'lock'.
+  /// Wait until either there is no in-flight write for 'block' or the block mgr is
+  /// cancelled. 'lock_' must be held with 'lock'.
   void WaitForWrite(boost::unique_lock<boost::mutex>& lock, Block* block);
 
   /// Callback used by DiskIoMgr to indicate a block write has completed.  write_status
@@ -481,6 +473,9 @@ class BufferedBlockMgr {
   /// Non-blocking and needs no lock_.
   Block* GetUnusedBlock(Client* client);
 
+  // Test helper to get the number of block writes currently outstanding.
+  int64_t GetNumWritesOutstanding();
+
   /// Used to debug the state of the block manager. Lock must already be taken.
   bool Validate() const;
   std::string DebugInternal() const;
@@ -559,10 +554,6 @@ class BufferedBlockMgr {
   /// blocks may be written. Blocks are round-robined across these files.
   boost::scoped_ptr<TmpFileMgr::FileGroup> tmp_file_group_;
 
-  /// DiskIoMgr handles to read and write blocks.
-  DiskIoMgr* io_mgr_;
-  DiskIoRequestContext* io_request_context_;
-
   /// If true, a disk write failed and all API calls return.
   /// Status::CANCELLED. Set to true if there was an error writing a block, or if
   /// WriteComplete() needed to reissue the write and that failed.
@@ -584,21 +575,12 @@ class BufferedBlockMgr {
   /// Number of Pin() calls that did not require a disk read.
   RuntimeProfile::Counter* buffered_pin_counter_;
 
-  /// Time taken for disk reads.
-  RuntimeProfile::Counter* disk_read_timer_;
-
   /// Time spent waiting for a free buffer.
   RuntimeProfile::Counter* buffer_wait_timer_;
 
-  /// Number of bytes written to disk (includes writes still queued in the IO manager).
-  RuntimeProfile::Counter* bytes_written_counter_;
-
   /// Number of writes outstanding (issued but not completed).
   RuntimeProfile::Counter* outstanding_writes_counter_;
 
-  /// Time spent in disk spill encryption, decryption, and integrity checking.
-  RuntimeProfile::Counter* encryption_timer_;
-
   /// Number of writes issued.
   int writes_issued_;
 
@@ -609,21 +591,9 @@ class BufferedBlockMgr {
   /// map contains only weak ptrs. BufferedBlockMgrs that are handed out are shared ptrs.
   /// When all the shared ptrs are no longer referenced, the BufferedBlockMgr
   /// d'tor will be called at which point the weak ptr will be removed from the map.
-  typedef boost::unordered_map<TUniqueId, std::weak_ptr<BufferedBlockMgr>>
-      BlockMgrsMap;
+  typedef boost::unordered_map<TUniqueId, std::weak_ptr<BufferedBlockMgr>> BlockMgrsMap;
   static BlockMgrsMap query_to_block_mgrs_;
 
-  /// Takes the data in buffer(), allocates 'encrypted_write_buffer_', encrypts the data
-  /// into 'encrypted_write_buffer_' and computes 'hash_'. Returns a pointer to the
-  /// encrypted data in 'outbuf'.
-  Status EncryptAndHash(Block* block, uint8_t** outbuf);
-
-  /// Deallocates the block's encrypted write buffer alloced in EncryptAndHash().
-  void EncryptedWriteComplete(Block* block);
-
-  /// Verifies the integrity hash and decrypts the contents of buffer() in place.
-  Status CheckHashAndDecrypt(Block* block);
-
   /// Debug option to delay write completion.
   int debug_write_delay_ms_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index 9f8d6d7..016b14f 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -239,12 +239,12 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
   read_io_mgr.reset();
 }
 
-// Perform invalid writes (e.g. non-existent file, negative offset) and validate
-// that an error status is returned via the write callback.
+// Perform invalid writes (e.g. file in non-existent directory, negative offset) and
+// validate that an error status is returned via the write callback.
 TEST_F(DiskIoMgrTest, InvalidWrite) {
   MemTracker mem_tracker(LARGE_MEM_LIMIT);
   num_ranges_written_ = 0;
-  string tmp_file = "/tmp/non-existent.txt";
+  string tmp_file = "/non-existent/file.txt";
   DiskIoMgr io_mgr(1, 1, 1, 10);
   ASSERT_OK(io_mgr.Init(&mem_tracker));
   DiskIoRequestContext* writer;
@@ -252,12 +252,12 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   int32_t* data = pool_->Add(new int32_t);
   *data = rand();
 
-  // Write to a non-existent file.
+  // Write to file in non-existent directory.
   DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
   DiskIoMgr::WriteRange::WriteDoneCallback callback =
-      bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
-          new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL,
-          data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
+      bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range,
+          (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, data,
+          Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
   *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback));
 
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 87ac33a..20cb9b5 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -78,6 +78,8 @@ static const int LOW_MEMORY = 64 * 1024 * 1024;
 
 const int DiskIoMgr::DEFAULT_QUEUE_CAPACITY = 2;
 
+AtomicInt32 DiskIoMgr::next_disk_id_;
+
 namespace detail {
 // Indicates if file handle caching should be used
 static inline bool is_file_handle_caching_enabled() {
@@ -262,6 +264,11 @@ void DiskIoMgr::BufferDescriptor::Return() {
 DiskIoMgr::WriteRange::WriteRange(
     const string& file, int64_t file_offset, int disk_id, WriteDoneCallback callback)
   : RequestRange(RequestType::WRITE), callback_(callback) {
+  SetRange(file, file_offset, disk_id);
+}
+
+void DiskIoMgr::WriteRange::SetRange(
+    const std::string& file, int64_t file_offset, int disk_id) {
   file_ = file;
   offset_ = file_offset;
   disk_id_ = disk_id;
@@ -947,8 +954,11 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
   return false;
 }
 
-void DiskIoMgr::HandleWriteFinished(DiskIoRequestContext* writer, WriteRange* write_range,
-    const Status& write_status) {
+void DiskIoMgr::HandleWriteFinished(
+    DiskIoRequestContext* writer, WriteRange* write_range, const Status& write_status) {
+  // Copy disk_id before running callback: the callback may modify write_range.
+  int disk_id = write_range->disk_id_;
+
   // Execute the callback before decrementing the thread count. Otherwise CancelContext()
   // that waits for the disk ref count to be 0 will return, creating a race, e.g.
   // between BufferedBlockMgr::WriteComplete() and BufferedBlockMgr::~BufferedBlockMgr().
@@ -958,7 +968,7 @@ void DiskIoMgr::HandleWriteFinished(DiskIoRequestContext* writer, WriteRange* wr
   {
     unique_lock<mutex> writer_lock(writer->lock_);
     DCHECK(writer->Validate()) << endl << writer->DebugString();
-    DiskIoRequestContext::PerDiskState& state = writer->disk_states_[write_range->disk_id_];
+    DiskIoRequestContext::PerDiskState& state = writer->disk_states_[disk_id];
     if (writer->state_ == DiskIoRequestContext::Cancelled) {
       state.DecrementRequestThreadAndCheckDone(writer);
     } else {
@@ -1152,13 +1162,24 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::TryAllocateNextBufferForRange(
 }
 
 void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_range) {
-  FILE* file_handle = fopen(write_range->file(), "rb+");
-  Status ret_status;
-  if (file_handle == NULL) {
+  Status ret_status = Status::OK();
+  FILE* file_handle = NULL;
+  // Raw open() syscall will create file if not present when passed these flags.
+  int fd = open(write_range->file(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
+  if (fd < 0) {
     ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
-        Substitute("fopen($0, \"rb+\") failed with errno=$1 description=$2",
-            write_range->file_, errno, GetStrErrMsg())));
+        Substitute("Opening '$0' for write failed with errno=$1 description=$2",
+                                     write_range->file_, errno, GetStrErrMsg())));
   } else {
+    file_handle = fdopen(fd, "wb");
+    if (file_handle == NULL) {
+      ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
+          Substitute("fdopen($0, \"wb\") failed with errno=$1 description=$2", fd, errno,
+                                       GetStrErrMsg())));
+    }
+  }
+
+  if (file_handle != NULL) {
     ret_status = WriteRangeHelper(file_handle, write_range);
 
     int success = fclose(file_handle);
@@ -1225,9 +1246,8 @@ int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) {
   // Assign to a local disk queue.
   DCHECK(!IsS3APath(file)); // S3 is always remote.
   if (disk_id == -1) {
-    // disk id is unknown, assign it a random one.
-    static int next_disk_id = 0;
-    disk_id = next_disk_id++;
+    // disk id is unknown, assign it an arbitrary one.
+    disk_id = next_disk_id_.Add(1);
   }
   // TODO: we need to parse the config for the number of dirs configured for this
   // data node.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index 4b650a8..c67f69d 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -15,10 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
 #ifndef IMPALA_RUNTIME_DISK_IO_MGR_H
 #define IMPALA_RUNTIME_DISK_IO_MGR_H
 
+#include <functional>
 #include <list>
 #include <vector>
 
@@ -599,14 +599,22 @@ class DiskIoMgr {
     /// (TStatusCode::CANCELLED). The callback is only invoked if this WriteRange was
     /// successfully added (i.e. AddWriteRange() succeeded). No locks are held while
     /// the callback is invoked.
-    typedef boost::function<void (const Status&)> WriteDoneCallback;
+    typedef std::function<void(const Status&)> WriteDoneCallback;
     WriteRange(const std::string& file, int64_t file_offset, int disk_id,
         WriteDoneCallback callback);
 
+    /// Change the file and offset of this write range. Data and callbacks are unchanged.
+    /// Can only be called when the write is not in flight (i.e. before AddWriteRange()
+    /// is called or after the write callback was called).
+    void SetRange(const std::string& file, int64_t file_offset, int disk_id);
+
     /// Set the data and number of bytes to be written for this WriteRange.
-    /// File data can be over-written by calling SetData() and AddWriteRange().
+    /// Can only be called when the write is not in flight (i.e. before AddWriteRange()
+    /// is called or after the write callback was called).
     void SetData(const uint8_t* buffer, int64_t len);
 
+    const uint8_t* data() const { return data_; }
+
    private:
     friend class DiskIoMgr;
     friend class DiskIoRequestContext;
@@ -855,6 +863,11 @@ class DiskIoMgr {
   /// It is indexed by disk id.
   std::vector<DiskQueue*> disk_queues_;
 
+  /// The next disk queue to write to if the actual 'disk_id_' is unknown (i.e. the file
+  /// is not associated with a particular local disk or remote queue). Used to implement
+  /// round-robin assignment for that case.
+  static AtomicInt32 next_disk_id_;
+
   // Caching structure that maps file names to cached file handles. The cache has an upper
   // limit of entries defined by FLAGS_max_cached_file_handles. Evicted cached file
   // handles are closed.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index ad996e3..d93a459 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -171,7 +171,7 @@ ExecEnv::ExecEnv()
     scheduler_.reset(new SimpleScheduler(
         addresses, metrics_.get(), webserver_.get(), request_pool_service_.get()));
   }
-  if (exec_env_ == NULL) exec_env_ = this;
+  exec_env_ = this;
 }
 
 // TODO: Need refactor to get rid of duplicated code.
@@ -224,12 +224,10 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
     scheduler_.reset(new SimpleScheduler(
         addresses, metrics_.get(), webserver_.get(), request_pool_service_.get()));
   }
-  if (exec_env_ == NULL) exec_env_ = this;
+  exec_env_ = this;
 }
 
-
-ExecEnv::~ExecEnv() {
-}
+ExecEnv::~ExecEnv() {}
 
 Status ExecEnv::InitForFeTests() {
   mem_tracker_.reset(new MemTracker(-1, "Process"));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 08ddd9f..be90a5a 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -61,7 +61,7 @@ class ExecEnv {
 
   /// Returns the first created exec env instance. In a normal impalad, this is
   /// the only instance. In test setups with multiple ExecEnv's per process,
-  /// we return the first instance.
+  /// we return the most recently created instance.
   static ExecEnv* GetInstance() { return exec_env_; }
 
   /// Empty destructor because the compiler-generated one requires full

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 2757750..def95c0 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -45,8 +45,6 @@ QueryState::QueryState(const TQueryCtx& query_ctx)
   // how many are distinct. It is defined as the sum of the number of generic errors and
   // the number of distinct other errors.
   if (query_options.max_errors <= 0) {
-    // TODO: fix linker error and uncomment this
-    //query_options_.max_errors = FLAGS_max_errors;
     query_options.max_errors = 100;
   }
   if (query_options.batch_size <= 0) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h
new file mode 100644
index 0000000..dd8bd07
--- /dev/null
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_TMP_FILE_MGR_INTERNAL_H
+#define IMPALA_RUNTIME_TMP_FILE_MGR_INTERNAL_H
+
+#include <string>
+
+#include "runtime/tmp-file-mgr.h"
+
+namespace impala {
+
+/// File is a handle to a physical file in a temporary directory. File space
+/// can be allocated and files removed using AllocateSpace() and Remove(). Used
+/// internally by TmpFileMgr.
+///
+/// Creation of the physical file in the file system is deferred until the file is
+/// written by DiskIoMgr.
+///
+/// Methods of File are not thread-safe.
+class TmpFileMgr::File {
+ public:
+  File(FileGroup* file_group, DeviceId device_id, const std::string& path);
+
+  /// Allocates 'num_bytes' bytes in this file for a new block of data.
+  /// The file size is increased by a call to truncate() if necessary.
+  /// Returns Status::OK() and sets 'offset' to the file offset of the first
+  /// byte in the allocated range on success.
+  /// Returns an error status if an unexpected error occurs, e.g. the file could not
+  /// be created.
+  Status AllocateSpace(int64_t num_bytes, int64_t* offset);
+
+  /// Called when an IO error is encountered for this file. Logs the error and blacklists
+  /// the file.
+  void Blacklist(const ErrorMsg& msg);
+
+  /// Delete the physical file on disk, if one was created.
+  /// It is not valid to read or write to a file after calling Remove().
+  Status Remove();
+
+  /// Get the disk ID that should be used for IO mgr queueing.
+  int AssignDiskQueue() const;
+
+  const std::string& path() const { return path_; }
+  bool is_blacklisted() const { return blacklisted_; }
+
+  std::string DebugString();
+
+ private:
+  friend class TmpFileMgrTest;
+  /// The name of the sub-directory that Impala creates within each configured scratch
+  /// directory.
+  const static std::string TMP_SUB_DIR_NAME;
+
+  /// Space (in MB) that must ideally be available for writing on a scratch
+  /// directory. A warning is issued if available space is less than this threshold.
+  const static uint64_t AVAILABLE_SPACE_THRESHOLD_MB;
+
+  /// The FileGroup this belongs to. Cannot be null.
+  FileGroup* const file_group_;
+
+  /// Path of the physical file in the filesystem.
+  const std::string path_;
+
+  /// The temporary device this file is stored on.
+  const DeviceId device_id_;
+
+  /// The id of the disk on which the physical file lies.
+  const int disk_id_;
+
+  /// Current bytes allocated in the file. Modified by AllocateSpace().
+  int64_t bytes_allocated_;
+
+  /// Set to true to indicate that we shouldn't allocate any more space in this file.
+  bool blacklisted_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 4deefd5..61fd682 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -19,12 +19,16 @@
 
 #include <boost/filesystem.hpp>
 #include <boost/scoped_ptr.hpp>
+#include <boost/thread/locks.hpp>
 #include <gtest/gtest.h>
 
 #include "common/init.h"
+#include "runtime/test-env.h"
+#include "runtime/tmp-file-mgr-internal.h"
 #include "runtime/tmp-file-mgr.h"
 #include "service/fe-support.h"
 #include "testutil/gtest-util.h"
+#include "util/condition-variable.h"
 #include "util/filesystem-util.h"
 #include "util/metrics.h"
 
@@ -37,22 +41,27 @@ using boost::filesystem::path;
 namespace impala {
 
 class TmpFileMgrTest : public ::testing::Test {
- protected:
+ public:
   virtual void SetUp() {
     metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
     profile_ = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "tmp-file-mgr-test"));
+    test_env_.reset(new TestEnv);
+    cb_counter_ = 0;
   }
 
   virtual void TearDown() {
+    test_env_.reset();
     metrics_.reset();
     obj_pool_.Clear();
   }
 
+  DiskIoMgr* io_mgr() { return test_env_->exec_env()->disk_io_mgr(); }
+
   /// Check that metric values are consistent with TmpFileMgr state.
   void CheckMetrics(TmpFileMgr* tmp_file_mgr) {
-    vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->active_tmp_devices();
-    IntGauge* active_metric = metrics_->FindMetricForTesting<IntGauge>(
-        "tmp-file-mgr.active-scratch-dirs");
+    vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->ActiveTmpDevices();
+    IntGauge* active_metric =
+        metrics_->FindMetricForTesting<IntGauge>("tmp-file-mgr.active-scratch-dirs");
     EXPECT_EQ(active.size(), active_metric->value());
     SetMetric<string>* active_set_metric =
         metrics_->FindMetricForTesting<SetMetric<string>>(
@@ -71,54 +80,118 @@ class TmpFileMgrTest : public ::testing::Test {
     }
   }
 
-  /// Helper to call the private NewFile() method.
-  static Status NewFile(TmpFileMgr::FileGroup* group,
-      const TmpFileMgr::DeviceId& device_id, const TUniqueId& query_id,
-      TmpFileMgr::File** new_file) {
-    return group->NewFile(device_id, query_id, new_file);
+  /// Helper to call the private CreateFiles() method and return
+  /// the created files.
+  static Status CreateFiles(
+      TmpFileMgr::FileGroup* group, vector<TmpFileMgr::File*>* files) {
+    // The method expects the lock to be held.
+    lock_guard<SpinLock> lock(group->lock_);
+    RETURN_IF_ERROR(group->CreateFiles());
+    for (unique_ptr<TmpFileMgr::File>& file : group->tmp_files_) {
+      files->push_back(file.get());
+    }
+    return Status::OK();
+  }
+
+  /// Helper to call the private TmpFileMgr::NewFile() method.
+  static Status NewFile(TmpFileMgr* mgr, TmpFileMgr::FileGroup* group,
+      TmpFileMgr::DeviceId device_id, unique_ptr<TmpFileMgr::File>* new_file) {
+    return mgr->NewFile(group, device_id, new_file);
   }
 
-  /// Helper to call the private TmpFile::AllocateSpace() method.
-  static Status AllocateSpace(
+  /// Helper to call the private File::AllocateSpace() method.
+  static Status FileAllocateSpace(
       TmpFileMgr::File* file, int64_t num_bytes, int64_t* offset) {
     return file->AllocateSpace(num_bytes, offset);
   }
 
+  /// Helper to call the private FileGroup::AllocateSpace() method.
+  static Status GroupAllocateSpace(TmpFileMgr::FileGroup* group, int64_t num_bytes,
+      TmpFileMgr::File** file, int64_t* offset) {
+    return group->AllocateSpace(num_bytes, file, offset);
+  }
+
+  /// Helper to set FileGroup::next_allocation_index_.
+  static void SetNextAllocationIndex(TmpFileMgr::FileGroup* group, int value) {
+    group->next_allocation_index_ = value;
+  }
+
+  /// Helper to get the # of bytes allocated by the group. Validates that the sum across
+  /// all files equals this total.
+  static int64_t BytesAllocated(TmpFileMgr::FileGroup* group) {
+    int64_t bytes_allocated = 0;
+    for (unique_ptr<TmpFileMgr::File>& file : group->tmp_files_) {
+      bytes_allocated += file->bytes_allocated_;
+    }
+    EXPECT_EQ(bytes_allocated, group->current_bytes_allocated_);
+    return bytes_allocated;
+  }
+
+  // Write callback, which signals 'cb_cv_' and increments 'cb_counter_'.
+  void SignalCallback(Status write_status) {
+    {
+      lock_guard<mutex> lock(cb_cv_lock_);
+      ++cb_counter_;
+    }
+    cb_cv_.NotifyAll();
+  }
+
+  /// Wait until 'cb_counter_' reaches 'val'.
+  void WaitForCallbacks(int64_t val) {
+    unique_lock<mutex> lock(cb_cv_lock_);
+    while (cb_counter_ < val) cb_cv_.Wait(lock);
+  }
+
   ObjectPool obj_pool_;
   scoped_ptr<MetricGroup> metrics_;
   // Owned by 'obj_pool_'.
   RuntimeProfile* profile_;
+
+  /// Used for DiskIoMgr.
+  scoped_ptr<TestEnv> test_env_;
+
+  // Variables used by SignalCallback().
+  mutex cb_cv_lock_;
+  ConditionVariable cb_cv_;
+  int64_t cb_counter_;
 };
 
 /// Regression test for IMPALA-2160. Verify that temporary file manager allocates blocks
-/// at the expected file offsets and expands the temporary file to the correct size.
+/// at the expected file offsets.
 TEST_F(TmpFileMgrTest, TestFileAllocation) {
   TmpFileMgr tmp_file_mgr;
   ASSERT_OK(tmp_file_mgr.Init(metrics_.get()));
-  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_);
+  TUniqueId id;
+  TmpFileMgr::FileGroup file_group(
+      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
+
   // Default configuration should give us one temporary device.
-  EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices());
-  vector<TmpFileMgr::DeviceId> tmp_devices = tmp_file_mgr.active_tmp_devices();
+  EXPECT_EQ(1, tmp_file_mgr.NumActiveTmpDevices());
+  vector<TmpFileMgr::DeviceId> tmp_devices = tmp_file_mgr.ActiveTmpDevices();
   EXPECT_EQ(1, tmp_devices.size());
-  TUniqueId id;
-  TmpFileMgr::File* file;
-  ASSERT_OK(NewFile(&file_group, tmp_devices[0], id, &file));
-  EXPECT_TRUE(file != NULL);
+  vector<TmpFileMgr::File*> files;
+  ASSERT_OK(CreateFiles(&file_group, &files));
+  EXPECT_EQ(1, files.size());
+  TmpFileMgr::File* file = files[0];
   // Apply writes of variable sizes and check space was allocated correctly.
   int64_t write_sizes[] = {1, 10, 1024, 4, 1024 * 1024 * 8, 1024 * 1024 * 8, 16, 10};
   int num_write_sizes = sizeof(write_sizes) / sizeof(write_sizes[0]);
   int64_t next_offset = 0;
   for (int i = 0; i < num_write_sizes; ++i) {
     int64_t offset;
-    ASSERT_OK(AllocateSpace(file, write_sizes[i], &offset));
+    ASSERT_OK(FileAllocateSpace(file, write_sizes[i], &offset));
     EXPECT_EQ(next_offset, offset);
     next_offset = offset + write_sizes[i];
-    EXPECT_EQ(next_offset, boost::filesystem::file_size(file->path()));
   }
   // Check that cleanup is correct.
   string file_path = file->path();
-  file_group.Close();
   EXPECT_FALSE(boost::filesystem::exists(file_path));
+
+  // Check that the file is cleaned up correctly. Need to create file first since
+  // tmp file is only allocated on writes.
+  EXPECT_OK(FileSystemUtil::CreateFile(file->path()));
+  file_group.Close();
+  EXPECT_FALSE(boost::filesystem::exists(file->path()));
   CheckMetrics(&tmp_file_mgr);
 }
 
@@ -129,15 +202,18 @@ TEST_F(TmpFileMgrTest, TestOneDirPerDevice) {
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, true, metrics_.get());
-  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_);
+  TUniqueId id;
+  TmpFileMgr::FileGroup file_group(
+      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
 
   // Only the first directory should be used.
-  EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices());
-  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices();
+  EXPECT_EQ(1, tmp_file_mgr.NumActiveTmpDevices());
+  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices();
   EXPECT_EQ(1, devices.size());
-  TUniqueId id;
-  TmpFileMgr::File* file;
-  ASSERT_OK(NewFile(&file_group, devices[0], id, &file));
+  vector<TmpFileMgr::File*> files;
+  ASSERT_OK(CreateFiles(&file_group, &files));
+  EXPECT_EQ(1, files.size());
+  TmpFileMgr::File* file = files[0];
   // Check the prefix is the expected temporary directory.
   EXPECT_EQ(0, file->path().find(tmp_dirs[0]));
   FileSystemUtil::RemovePaths(tmp_dirs);
@@ -151,19 +227,22 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) {
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
-  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_);
+  TUniqueId id;
+  TmpFileMgr::FileGroup file_group(
+      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
 
   // Both directories should be used.
-  EXPECT_EQ(2, tmp_file_mgr.num_active_tmp_devices());
-  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices();
+  EXPECT_EQ(2, tmp_file_mgr.NumActiveTmpDevices());
+  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices();
   EXPECT_EQ(2, devices.size());
-  for (int i = 0; i < tmp_dirs.size(); ++i) {
+
+  vector<TmpFileMgr::File*> files;
+  ASSERT_OK(CreateFiles(&file_group, &files));
+  EXPECT_EQ(2, files.size());
+  for (int i = 0; i < 2; ++i) {
     EXPECT_EQ(0, tmp_file_mgr.GetTmpDirPath(devices[i]).find(tmp_dirs[i]));
-    TUniqueId id;
-    TmpFileMgr::File* file;
-    ASSERT_OK(NewFile(&file_group, devices[i], id, &file));
     // Check the prefix is the expected temporary directory.
-    EXPECT_EQ(0, file->path().find(tmp_dirs[i]));
+    EXPECT_EQ(0, files[i]->path().find(tmp_dirs[i]));
   }
   FileSystemUtil::RemovePaths(tmp_dirs);
   file_group.Close();
@@ -171,79 +250,79 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) {
 }
 
 /// Test that reporting a write error is possible but does not result in
-/// blacklisting, which is disabled.
+/// blacklisting the device.
 TEST_F(TmpFileMgrTest, TestReportError) {
   vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
-  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_);
+  TUniqueId id;
+  TmpFileMgr::FileGroup file_group(
+      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
 
   // Both directories should be used.
-  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices();
+  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices();
   EXPECT_EQ(2, devices.size());
   CheckMetrics(&tmp_file_mgr);
 
   // Inject an error on one device so that we can validate it is handled correctly.
-  TUniqueId id;
   int good_device = 0, bad_device = 1;
-  TmpFileMgr::File* bad_file;
-  ASSERT_OK(NewFile(&file_group, devices[bad_device], id, &bad_file));
+  vector<TmpFileMgr::File*> files;
+  ASSERT_OK(CreateFiles(&file_group, &files));
+  ASSERT_EQ(2, files.size());
+  TmpFileMgr::File* good_file = files[good_device];
+  TmpFileMgr::File* bad_file = files[bad_device];
   ErrorMsg errmsg(TErrorCode::GENERAL, "A fake error");
-  bad_file->ReportIOError(errmsg);
+  bad_file->Blacklist(errmsg);
 
-  // Blacklisting is disabled.
-  EXPECT_FALSE(bad_file->is_blacklisted());
-  // The second device should still be active.
-  EXPECT_EQ(2, tmp_file_mgr.num_active_tmp_devices());
-  vector<TmpFileMgr::DeviceId> devices_after = tmp_file_mgr.active_tmp_devices();
+  // File-level blacklisting is enabled but not device-level.
+  EXPECT_TRUE(bad_file->is_blacklisted());
+  // The bad device should still be active.
+  EXPECT_EQ(2, tmp_file_mgr.NumActiveTmpDevices());
+  vector<TmpFileMgr::DeviceId> devices_after = tmp_file_mgr.ActiveTmpDevices();
   EXPECT_EQ(2, devices_after.size());
   CheckMetrics(&tmp_file_mgr);
 
   // Attempts to expand bad file should succeed.
   int64_t offset;
-  ASSERT_OK(AllocateSpace(bad_file, 128, &offset));
+  ASSERT_OK(FileAllocateSpace(bad_file, 128, &offset));
   // The good device should still be usable.
-  TmpFileMgr::File* good_file;
-  ASSERT_OK(NewFile(&file_group, devices[good_device], id, &good_file));
-  EXPECT_TRUE(good_file != NULL);
-  ASSERT_OK(AllocateSpace(good_file, 128, &offset));
+  ASSERT_OK(FileAllocateSpace(good_file, 128, &offset));
   // Attempts to allocate new files on bad device should succeed.
-  ASSERT_OK(NewFile(&file_group, devices[bad_device], id, &bad_file));
+  unique_ptr<TmpFileMgr::File> bad_file2;
+  ASSERT_OK(NewFile(&tmp_file_mgr, &file_group, bad_device, &bad_file2));
   FileSystemUtil::RemovePaths(tmp_dirs);
   file_group.Close();
   CheckMetrics(&tmp_file_mgr);
 }
 
-TEST_F(TmpFileMgrTest, TestAllocateFails) {
-  string tmp_dir("/tmp/tmp-file-mgr-test.1");
-  string scratch_subdir = tmp_dir + "/impala-scratch";
-  vector<string> tmp_dirs({tmp_dir});
+TEST_F(TmpFileMgrTest, TestAllocateNonWritable) {
+  vector<string> tmp_dirs;
+  vector<string> scratch_subdirs;
+  for (int i = 0; i < 2; ++i) {
+    tmp_dirs.push_back(Substitute("/tmp/tmp-file-mgr-test.$0", i));
+    scratch_subdirs.push_back(tmp_dirs[i] + "/impala-scratch");
+  }
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
-  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_);
-
   TUniqueId id;
-  TmpFileMgr::File* allocated_file1;
-  TmpFileMgr::File* allocated_file2;
+  TmpFileMgr::FileGroup file_group(
+      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);
+
+  vector<TmpFileMgr::File*> allocated_files;
+  ASSERT_OK(CreateFiles(&file_group, &allocated_files))
   int64_t offset;
-  ASSERT_OK(NewFile(&file_group, 0, id, &allocated_file1));
-  ASSERT_OK(NewFile(&file_group, 0, id, &allocated_file2));
-  ASSERT_OK(AllocateSpace(allocated_file1, 1, &offset));
+  ASSERT_OK(FileAllocateSpace(allocated_files[0], 1, &offset));
 
-  // Make scratch non-writable and test for allocation errors at different stages:
+  // Make scratch non-writable and test allocation at different stages:
   // new file creation, files with no allocated blocks. files with allocated space.
-  chmod(scratch_subdir.c_str(), 0);
-  // allocated_file1 already has space allocated.
-  EXPECT_FALSE(AllocateSpace(allocated_file1, 1, &offset).ok());
-  // allocated_file2 has no space allocated.
-  EXPECT_FALSE(AllocateSpace(allocated_file2, 1, &offset).ok());
-  // Creating a new File object can succeed because it is not immediately created on disk.
-  TmpFileMgr::File* unallocated_file;
-  ASSERT_OK(NewFile(&file_group, 0, id, &unallocated_file));
-
-  chmod(scratch_subdir.c_str(), S_IRWXU);
+  // No errors should be encountered during allocation since allocation is purely logical.
+  chmod(scratch_subdirs[0].c_str(), 0);
+  ASSERT_OK(FileAllocateSpace(allocated_files[0], 1, &offset));
+  ASSERT_OK(FileAllocateSpace(allocated_files[1], 1, &offset));
+
+  chmod(scratch_subdirs[0].c_str(), S_IRWXU);
   FileSystemUtil::RemovePaths(tmp_dirs);
   file_group.Close();
 }
@@ -256,51 +335,86 @@ TEST_F(TmpFileMgrTest, TestScratchLimit) {
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
 
   const int64_t LIMIT = 100;
-  const int64_t FILE1_ALLOC = 25;
-  const int64_t FILE2_ALLOC = LIMIT - FILE1_ALLOC;
-  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, profile_, LIMIT);
-  TmpFileMgr::File* file1;
-  TmpFileMgr::File* file2;
+  const int64_t ALLOC_SIZE = 50;
   TUniqueId id;
-  ASSERT_OK(NewFile(&file_group, 0, id, &file1));
-  ASSERT_OK(NewFile(&file_group, 1, id, &file2));
+  TmpFileMgr::FileGroup file_group(
+      &tmp_file_mgr, io_mgr(), profile_, id, ALLOC_SIZE, LIMIT);
+
+  vector<TmpFileMgr::File*> files;
+  ASSERT_OK(CreateFiles(&file_group, &files));
 
   // Test individual limit is enforced.
   Status status;
   int64_t offset;
   TmpFileMgr::File* alloc_file;
-  // Alloc from both files should fail.
-  for (int i = 0; i <= 1; ++i) {
-    status = file_group.AllocateSpace(LIMIT + 1, &alloc_file, &offset);
-    ASSERT_FALSE(status.ok());
-    ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
-  }
 
   // Alloc from file 1 should succeed.
-  ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
-  ASSERT_OK(file_group.AllocateSpace(FILE1_ALLOC, &alloc_file, &offset));
-  ASSERT_EQ(alloc_file, file1); // Should select files round-robin.
+  SetNextAllocationIndex(&file_group, 0);
+  ASSERT_OK(GroupAllocateSpace(&file_group, ALLOC_SIZE, &alloc_file, &offset));
+  ASSERT_EQ(alloc_file, files[0]); // Should select files round-robin.
   ASSERT_EQ(0, offset);
 
-  // Test aggregate limit is enforced on both files.
-  for (int i = 0; i <= 1; ++i) {
-    status = file_group.AllocateSpace(FILE2_ALLOC + 1, &alloc_file, &offset);
-    ASSERT_FALSE(status.ok());
-    ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
-  }
-
   // Allocate up to the max.
-  ASSERT_OK(file_group.AllocateSpace(FILE2_ALLOC, &alloc_file, &offset));
+  ASSERT_OK(GroupAllocateSpace(&file_group, ALLOC_SIZE, &alloc_file, &offset));
   ASSERT_EQ(0, offset);
-  ASSERT_EQ(alloc_file, file2);
+  ASSERT_EQ(alloc_file, files[1]);
 
-  // Test aggregate limit still enforced.
-  status = file_group.AllocateSpace(1, &alloc_file, &offset);
+  // Test aggregate limit is enforced.
+  status = GroupAllocateSpace(&file_group, 1, &alloc_file, &offset);
   ASSERT_FALSE(status.ok());
   ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
 
   file_group.Close();
 }
+
+// Test that scratch file ranges are recycled as expected.
+TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) {
+  const int64_t ALLOC_SIZE = 50;
+  TUniqueId id;
+  TmpFileMgr::FileGroup file_group(
+      test_env_->tmp_file_mgr(), io_mgr(), profile_, id, ALLOC_SIZE);
+
+  // Generate some data.
+  const int BLOCKS = 5;
+  vector<vector<uint8_t>> data(BLOCKS);
+  for (int i = 0; i < BLOCKS; ++i) {
+    data[i].resize(ALLOC_SIZE);
+    std::iota(data[i].begin(), data[i].end(), i);
+  }
+
+  DiskIoMgr::WriteRange::WriteDoneCallback callback =
+      bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
+  vector<unique_ptr<TmpFileMgr::WriteHandle>> handles(BLOCKS);
+  // Make sure free space doesn't grow over several iterations.
+  const int TEST_ITERS = 5;
+  for (int i = 0; i < TEST_ITERS; ++i) {
+    cb_counter_ = 0;
+    for (int j = 0; j < BLOCKS; ++j) {
+      ASSERT_OK(
+          file_group.Write(MemRange(data[j].data(), ALLOC_SIZE), callback, &handles[j]));
+    }
+    WaitForCallbacks(BLOCKS);
+    EXPECT_EQ(ALLOC_SIZE * BLOCKS, BytesAllocated(&file_group));
+
+    // Read back and validate.
+    for (int j = 0; j < BLOCKS; ++j) {
+      uint8_t tmp[ALLOC_SIZE];
+      ASSERT_OK(file_group.Read(handles[j].get(), MemRange(tmp, ALLOC_SIZE)));
+      EXPECT_EQ(0, memcmp(tmp, data[j].data(), ALLOC_SIZE));
+      file_group.DestroyWriteHandle(move(handles[j]));
+    }
+    // Check that the space is still in use - it should be recycled by the next iteration.
+    EXPECT_EQ(ALLOC_SIZE * BLOCKS, BytesAllocated(&file_group));
+  }
+
+  file_group.Close();
+  test_env_->TearDownRuntimeStates();
+}
 }
 
-IMPALA_TEST_MAIN();
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+  return RUN_ALL_TESTS();
+}