You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/04/12 23:18:37 UTC

[03/50] incubator-impala git commit: Add AtomicInt32/AtomicInt64 typedefs

Add AtomicInt32/AtomicInt64 typedefs

Only certain primitive types can be used with AtomicInt<>, so let's
treat AtomicInt<> as an implementation detail and instead expose
typedefs for the Atomic integer sizes we support. Convert all decls to
use the new typedefs.

Change-Id: I8b91ba684aabc67ed1721c7b7320aa42049268c8
Reviewed-on: http://gerrit.cloudera.org:8080/2601
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/10e7de79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/10e7de79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/10e7de79

Branch: refs/heads/master
Commit: 10e7de7969f96fe3edb00fcab3f7a8c432bd20a0
Parents: c039791
Author: Dan Hecht <dh...@cloudera.com>
Authored: Tue Mar 22 14:24:23 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Mar 23 22:29:18 2016 +0000

----------------------------------------------------------------------
 be/src/common/atomic-test.cc            |  3 ++-
 be/src/common/atomic.h                  | 12 +++++++++++-
 be/src/exec/hdfs-scan-node.h            |  8 ++++----
 be/src/rpc/rpc-trace.h                  |  2 +-
 be/src/runtime/data-stream-recvr.h      |  2 +-
 be/src/runtime/disk-io-mgr-internal.h   | 24 ++++++++++++------------
 be/src/runtime/disk-io-mgr-test.cc      | 18 +++++++++---------
 be/src/runtime/disk-io-mgr.h            |  4 ++--
 be/src/runtime/lib-cache.h              |  2 +-
 be/src/runtime/mem-tracker.cc           |  2 +-
 be/src/runtime/mem-tracker.h            |  2 +-
 be/src/runtime/plan-fragment-executor.h |  2 +-
 be/src/scheduling/query-resource-mgr.cc |  8 ++++----
 be/src/scheduling/query-resource-mgr.h  |  8 ++++----
 be/src/util/counting-barrier.h          |  2 +-
 be/src/util/hdfs-bulk-ops.h             |  2 +-
 be/src/util/internal-queue-test.cc      |  6 +++---
 be/src/util/periodic-counter-updater.h  |  2 +-
 be/src/util/progress-updater.h          |  4 ++--
 be/src/util/runtime-profile.h           |  4 ++--
 20 files changed, 64 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/common/atomic-test.cc
----------------------------------------------------------------------
diff --git a/be/src/common/atomic-test.cc b/be/src/common/atomic-test.cc
index 5715022..4e4559c 100644
--- a/be/src/common/atomic-test.cc
+++ b/be/src/common/atomic-test.cc
@@ -20,9 +20,10 @@
 
 #include "common/names.h"
 
-
 namespace impala {
 
+using namespace internal; // Testing AtomicInt<> directly.
+
 // Simple test to make sure there is no obvious error in the operations.  This is not
 // intended to test the thread safety.
 template<typename T>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/common/atomic.h
----------------------------------------------------------------------
diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h
index 07a9036..b0b8faa 100644
--- a/be/src/common/atomic.h
+++ b/be/src/common/atomic.h
@@ -45,7 +45,10 @@ class AtomicUtil {
   }
 };
 
-/// Atomic integer. 'T' can be either 32-bit or 64-bit signed integer. Each operation
+namespace internal {
+
+/// Atomic integer. This class template should not be used directly; instead use the
+/// typedefs below. 'T' can be either 32-bit or 64-bit signed integer. Each operation
 /// is performed atomically and has a specified memory-ordering semantic:
 ///
 /// Acquire: these operations ensure no later memory access by the same thread can be
@@ -102,6 +105,13 @@ class AtomicInt {
   DISALLOW_COPY_AND_ASSIGN(AtomicInt);
 };
 
+} // namespace internal
+
+/// Supported atomic types. Use these types rather than referring to AtomicInt<>
+/// directly.
+typedef internal::AtomicInt<int32_t> AtomicInt32;
+typedef internal::AtomicInt<int64_t> AtomicInt64;
+
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 4ddd33d..73b9527 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -369,7 +369,7 @@ class HdfsScanNode : public ScanNode {
   int64_t scanner_thread_bytes_required_;
 
   /// Number of files that have not been issued from the scanners.
-  AtomicInt<int> num_unqueued_files_;
+  AtomicInt32 num_unqueued_files_;
 
   /// Map of HdfsScanner objects to file types.  Only one scanner object will be
   /// created for each file type.  Objects stored in runtime_state's pool.
@@ -424,12 +424,12 @@ class HdfsScanNode : public ScanNode {
   /// This is the number of io buffers that are owned by the scan node and the scanners.
   /// This is used just to help debug leaked io buffers to determine if the leak is
   /// happening in the scanners vs other parts of the execution.
-  AtomicInt<int> num_owned_io_buffers_;
+  AtomicInt32 num_owned_io_buffers_;
 
   /// Counters which track the number of scanners that have codegen enabled for the
   /// materialize and conjuncts evaluation code paths.
-  AtomicInt<int> num_scanners_codegen_enabled_;
-  AtomicInt<int> num_scanners_codegen_disabled_;
+  AtomicInt32 num_scanners_codegen_enabled_;
+  AtomicInt32 num_scanners_codegen_disabled_;
 
   /// The size of the largest compressed text file to be scanned. This is used to
   /// estimate scanner thread memory usage.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/rpc/rpc-trace.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-trace.h b/be/src/rpc/rpc-trace.h
index aed05f4..5abe0a2 100644
--- a/be/src/rpc/rpc-trace.h
+++ b/be/src/rpc/rpc-trace.h
@@ -84,7 +84,7 @@ class RpcEventHandler : public apache::thrift::TProcessorEventHandler {
     StatsMetric<double>* time_stats;
 
     /// Number of invocations in flight
-    AtomicInt<int32_t> num_in_flight;
+    AtomicInt32 num_in_flight;
   };
 
   /// Map from method name to descriptor

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.h b/be/src/runtime/data-stream-recvr.h
index 7879c44..2a3b184 100644
--- a/be/src/runtime/data-stream-recvr.h
+++ b/be/src/runtime/data-stream-recvr.h
@@ -136,7 +136,7 @@ class DataStreamRecvr {
   bool is_merging_;
 
   /// total number of bytes held across all sender queues.
-  AtomicInt<int> num_buffered_bytes_;
+  AtomicInt32 num_buffered_bytes_;
 
   /// Memtracker for batches in the sender queue(s).
   boost::scoped_ptr<MemTracker> mem_tracker_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h
index fe607e7..e58a3f3 100644
--- a/be/src/runtime/disk-io-mgr-internal.h
+++ b/be/src/runtime/disk-io-mgr-internal.h
@@ -212,48 +212,48 @@ class DiskIoMgr::RequestContext {
   RuntimeProfile::Counter* disks_accessed_bitmap_;
 
   /// Total number of bytes read locally, updated at end of each range scan
-  AtomicInt<int64_t> bytes_read_local_;
+  AtomicInt64 bytes_read_local_;
 
   /// Total number of bytes read via short circuit read, updated at end of each range scan
-  AtomicInt<int64_t> bytes_read_short_circuit_;
+  AtomicInt64 bytes_read_short_circuit_;
 
   /// Total number of bytes read from date node cache, updated at end of each range scan
-  AtomicInt<int64_t> bytes_read_dn_cache_;
+  AtomicInt64 bytes_read_dn_cache_;
 
   /// Total number of bytes from remote reads that were expected to be local.
-  AtomicInt<int64_t> unexpected_remote_bytes_;
+  AtomicInt64 unexpected_remote_bytes_;
 
   /// The number of buffers that have been returned to the reader (via GetNext) that the
   /// reader has not returned. Only included for debugging and diagnostics.
-  AtomicInt<int> num_buffers_in_reader_;
+  AtomicInt32 num_buffers_in_reader_;
 
   /// The number of scan ranges that have been completed for this reader.
-  AtomicInt<int> num_finished_ranges_;
+  AtomicInt32 num_finished_ranges_;
 
   /// The number of scan ranges that required a remote read, updated at the end of each
   /// range scan. Only used for diagnostics.
-  AtomicInt<int> num_remote_ranges_;
+  AtomicInt32 num_remote_ranges_;
 
   /// The total number of scan ranges that have not been started. Only used for
   /// diagnostics. This is the sum of all unstarted_scan_ranges across all disks.
-  AtomicInt<int> num_unstarted_scan_ranges_;
+  AtomicInt32 num_unstarted_scan_ranges_;
 
   /// The number of buffers that are being used for this reader. This is the sum
   /// of all buffers in ScanRange queues and buffers currently being read into (i.e. about
   /// to be queued).
-  AtomicInt<int> num_used_buffers_;
+  AtomicInt32 num_used_buffers_;
 
   /// The total number of ready buffers across all ranges.  Ready buffers are buffers
   /// that have been read from disk but not retrieved by the caller.
   /// This is the sum of all queued buffers in all ranges for this reader context.
-  AtomicInt<int> num_ready_buffers_;
+  AtomicInt32 num_ready_buffers_;
 
   /// The total (sum) of queue capacities for finished scan ranges. This value
   /// divided by num_finished_ranges_ is the average for finished ranges and
   /// used to seed the starting queue capacity for future ranges. The assumption
   /// is that if previous ranges were fast, new ones will be fast too. The scan
   /// range adjusts the queue capacity dynamically so a rough approximation will do.
-  AtomicInt<int> total_range_queue_capacity_;
+  AtomicInt32 total_range_queue_capacity_;
 
   /// The initial queue size for new scan ranges. This is always
   /// total_range_queue_capacity_ / num_finished_ranges_ but stored as a separate
@@ -447,7 +447,7 @@ class DiskIoMgr::RequestContext {
     /// entire operation, we need this ref count to keep track of which thread should do
     /// final resource cleanup during cancellation.
     /// Only the thread that sees the count at 0 should do the final cleanup.
-    AtomicInt<int> num_threads_in_op_;
+    AtomicInt32 num_threads_in_op_;
 
     /// Queue of write ranges to process for this disk. A write range is always added
     /// to in_flight_ranges_ in GetNextRequestRange(). There is a separate

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index 806a036..ee89f56 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -136,7 +136,7 @@ class DiskIoMgrTest : public testing::Test {
   // Updates num_ranges_processed with the number of ranges seen by this thread.
   static void ScanRangeThread(DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader,
       const char* expected_result, int expected_len, const Status& expected_status,
-      int max_ranges, AtomicInt<int>* num_ranges_processed) {
+      int max_ranges, AtomicInt32* num_ranges_processed) {
     int num_ranges = 0;
     while (max_ranges == 0 || num_ranges < max_ranges) {
       DiskIoMgr::ScanRange* range;
@@ -373,7 +373,7 @@ TEST_F(DiskIoMgrTest, SingleReader) {
           }
           ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
 
-          AtomicInt<int> num_ranges_processed;
+          AtomicInt32 num_ranges_processed;
           thread_group threads;
           for (int i = 0; i < num_read_threads; ++i) {
             threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
@@ -432,7 +432,7 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
                 stat_val.st_mtime));
           }
         }
-        AtomicInt<int> num_ranges_processed;
+        AtomicInt32 num_ranges_processed;
 
         // Issue first half the scan ranges.
         ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_first_half));
@@ -507,7 +507,7 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
         }
         ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
 
-        AtomicInt<int> num_ranges_processed;
+        AtomicInt32 num_ranges_processed;
         thread_group threads;
         for (int i = 0; i < 5; ++i) {
           threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
@@ -570,7 +570,7 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
         }
         ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
 
-        AtomicInt<int> num_ranges_processed;
+        AtomicInt32 num_ranges_processed;
         int num_succesful_ranges = ranges.size() / 2;
         // Read half the ranges
         for (int i = 0; i < num_succesful_ranges; ++i) {
@@ -637,7 +637,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
     // Don't return buffers to force memory pressure
     vector<DiskIoMgr::BufferDescriptor*> buffers;
 
-    AtomicInt<int> num_ranges_processed;
+    AtomicInt32 num_ranges_processed;
     ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::MemLimitExceeded(),
         1, &num_ranges_processed);
 
@@ -717,7 +717,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
     }
     ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
 
-    AtomicInt<int> num_ranges_processed;
+    AtomicInt32 num_ranges_processed;
     thread_group threads;
     for (int i = 0; i < 5; ++i) {
       threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
@@ -780,7 +780,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
         while (read_offset < file_size) {
           for (int context_index = 0; context_index < num_contexts; ++context_index) {
             if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-            AtomicInt<int> num_ranges_processed;
+            AtomicInt32 num_ranges_processed;
             thread_group threads;
             vector<DiskIoMgr::ScanRange*> ranges;
             int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset);
@@ -898,7 +898,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
             ASSERT_OK(io_mgr.AddScanRanges(readers[i], ranges));
           }
 
-          AtomicInt<int> num_ranges_processed;
+          AtomicInt32 num_ranges_processed;
           thread_group threads;
           for (int i = 0; i < NUM_READERS; ++i) {
             for (int j = 0; j < NUM_THREADS_PER_READER; ++j) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index d695555..b130d52 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -734,10 +734,10 @@ class DiskIoMgr {
   std::list<BufferDescriptor*> free_buffer_descs_;
 
   /// Total number of allocated buffers, used for debugging.
-  AtomicInt<int> num_allocated_buffers_;
+  AtomicInt32 num_allocated_buffers_;
 
   /// Total number of buffers in readers
-  AtomicInt<int> num_buffers_in_readers_;
+  AtomicInt32 num_buffers_in_readers_;
 
   /// Per disk queues. This is static and created once at Init() time.  One queue is
   /// allocated for each local disk on the system and for each remote filesystem type.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/lib-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h
index 005f166..b36a859 100644
--- a/be/src/runtime/lib-cache.h
+++ b/be/src/runtime/lib-cache.h
@@ -117,7 +117,7 @@ class LibCache {
 
   /// The number of libs that have been copied from HDFS to the local FS.
   /// This is appended to the local fs path to remove collisions.
-  AtomicInt<int64_t> num_libs_copied_;
+  AtomicInt64 num_libs_copied_;
 
   /// Protects lib_cache_. For lock ordering, this lock must always be taken before
   /// the per entry lock.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index 1fc462f..e12c946 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -39,7 +39,7 @@ MemTracker::RequestTrackersMap MemTracker::request_to_mem_trackers_;
 MemTracker::PoolTrackersMap MemTracker::pool_to_mem_trackers_;
 mutex MemTracker::static_mem_trackers_lock_;
 
-AtomicInt<int64_t> MemTracker::released_memory_since_gc_;
+AtomicInt64 MemTracker::released_memory_since_gc_;
 
 // Name for request pool MemTrackers. '$0' is replaced with the pool name.
 const string REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT = "RequestPool=$0";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 46f08c3..2fd23a7 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -394,7 +394,7 @@ class MemTracker {
 
   /// Total amount of memory from calls to Release() since the last GC. If this
   /// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc.
-  static AtomicInt<int64_t> released_memory_since_gc_;
+  static AtomicInt64 released_memory_since_gc_;
 
   /// Lock to protect GcMemory(). This prevents many GCs from occurring at once.
   boost::mutex gc_lock_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
index bb2f3a9..91cece6 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -231,7 +231,7 @@ class PlanFragmentExecutor {
   /// be fired. It is initialized to 0 and atomically swapped to 1 when a completed
   /// fragment report is about to be fired. Used for reducing the probability that a
   /// report is sent twice at the end of the fragment.
-  AtomicInt<int> completed_report_sent_;
+  AtomicInt32 completed_report_sent_;
 
   /// Sampled memory usage at even time intervals.
   RuntimeProfile::TimeSeriesCounter* mem_usage_sampled_counter_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/scheduling/query-resource-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-resource-mgr.cc b/be/src/scheduling/query-resource-mgr.cc
index 6a5f8f6..e1e0581 100644
--- a/be/src/scheduling/query-resource-mgr.cc
+++ b/be/src/scheduling/query-resource-mgr.cc
@@ -97,8 +97,8 @@ void QueryResourceMgr::InitVcoreAcquisition(int32_t init_vcores) {
   // inspects immediately after exiting Expand(), and if true, exits before touching any
   // of the class-wide state (because the destructor may have finished before this point).
 
-  thread_in_expand_.reset(new AtomicInt<int32_t>());
-  early_exit_.reset(new AtomicInt<int32_t>());
+  thread_in_expand_.reset(new AtomicInt32());
+  early_exit_.reset(new AtomicInt32());
   acquire_vcore_thread_.reset(
       new Thread("resource-mgmt", Substitute("acquire-cpu-$0", PrintId(query_id_)),
           bind<void>(mem_fn(&QueryResourceMgr::AcquireVcoreResources), this,
@@ -170,8 +170,8 @@ Status QueryResourceMgr::RequestMemExpansion(int64_t requested_bytes,
 }
 
 void QueryResourceMgr::AcquireVcoreResources(
-    shared_ptr<AtomicInt<int32_t> > thread_in_expand,
-    shared_ptr<AtomicInt<int32_t> > early_exit) {
+    shared_ptr<AtomicInt32> thread_in_expand,
+    shared_ptr<AtomicInt32> early_exit) {
   // Take a copy because we'd like to print it in some cases after the destructor.
   TUniqueId reservation_id = reservation_id_;
   VLOG_QUERY << "Starting Vcore acquisition for: " << reservation_id;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/scheduling/query-resource-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-resource-mgr.h b/be/src/scheduling/query-resource-mgr.h
index 589e327..c6080e8 100644
--- a/be/src/scheduling/query-resource-mgr.h
+++ b/be/src/scheduling/query-resource-mgr.h
@@ -195,12 +195,12 @@ class QueryResourceMgr {
   /// parent QueryResourceMgr has been destroyed.
   /// TODO: Combine with ShouldExit(), and replace with AtomicBool when we have such a
   /// thing.
-  boost::shared_ptr<AtomicInt<int32_t> > early_exit_;
+  boost::shared_ptr<AtomicInt32> early_exit_;
 
   /// Signals to the destructor that the vcore acquisition thread is currently in an
   /// Expand() RPC. If so, the destructor does not need to wait for the acquisition thread
   /// to exit.
-  boost::shared_ptr<AtomicInt<int32_t> > thread_in_expand_;
+  boost::shared_ptr<AtomicInt32> thread_in_expand_;
 
   /// Creates the llama resource for the memory and/or cores specified, associated with
   /// the reservation context.
@@ -209,8 +209,8 @@ class QueryResourceMgr {
   /// Run as a thread owned by acquire_cpu_thread_. Waits for notification from
   /// NotifyThreadUsageChange(), then checks the subscription level to decide if more
   /// VCores are needed, and starts a new expansion request if so.
-  void AcquireVcoreResources(boost::shared_ptr<AtomicInt<int32_t> > thread_in_expand,
-      boost::shared_ptr<AtomicInt<int32_t> > early_exit);
+  void AcquireVcoreResources(boost::shared_ptr<AtomicInt32 > thread_in_expand,
+      boost::shared_ptr<AtomicInt32> early_exit);
 
   /// True if thread:VCore subscription is too high, meaning more VCores are required.
   /// Must be called holding threads_running_ lock.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/counting-barrier.h
----------------------------------------------------------------------
diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h
index 917c8ec..72e7ac9 100644
--- a/be/src/util/counting-barrier.h
+++ b/be/src/util/counting-barrier.h
@@ -43,7 +43,7 @@ class CountingBarrier {
   Promise<bool> promise_;
 
   /// The number of pending notifications remaining.
-  AtomicInt<int32_t> count_;
+  AtomicInt32 count_;
 
   DISALLOW_COPY_AND_ASSIGN(CountingBarrier);
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/hdfs-bulk-ops.h
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-bulk-ops.h b/be/src/util/hdfs-bulk-ops.h
index f543a06..432029f 100644
--- a/be/src/util/hdfs-bulk-ops.h
+++ b/be/src/util/hdfs-bulk-ops.h
@@ -125,7 +125,7 @@ class HdfsOperationSet {
 
   /// The number of ops remaining to be executed. Used to coordinate between executor
   /// threads so that when all ops are finished, promise_ is signalled.
-  AtomicInt<int64_t> num_ops_;
+  AtomicInt64 num_ops_;
 
   /// HDFS connection shared between all operations. Not owned by this class.
   hdfsFS* hdfs_connection_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/internal-queue-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/internal-queue-test.cc b/be/src/util/internal-queue-test.cc
index 26e91a5..30bb1e8 100644
--- a/be/src/util/internal-queue-test.cc
+++ b/be/src/util/internal-queue-test.cc
@@ -150,7 +150,7 @@ const int VALIDATE_INTERVAL = 10000;
 
 // CHECK() is not thread safe so return the result in *failed.
 void ProducerThread(InternalQueue<IntNode>* queue, int num_inserts,
-    vector<IntNode>* nodes, AtomicInt<int32_t>* counter, bool* failed) {
+    vector<IntNode>* nodes, AtomicInt32* counter, bool* failed) {
   for (int i = 0; i < num_inserts && !*failed; ++i) {
     // Get the next index to queue.
     int32_t value = counter->Add(1) - 1;
@@ -204,7 +204,7 @@ TEST(InternalQueue, TestClear) {
 
 TEST(InternalQueue, TestSingleProducerSingleConsumer) {
   vector<IntNode> nodes;
-  AtomicInt<int32_t> counter;
+  AtomicInt32 counter;
   nodes.resize(1000000);
   vector<int> results;
 
@@ -233,7 +233,7 @@ TEST(InternalQueue, TestMultiProducerMultiConsumer) {
 
   bool failed = false;
   for (int num_producers = 1; num_producers < 5; num_producers += 3) {
-    AtomicInt<int32_t> counter;
+    AtomicInt32 counter;
     const int NUM_CONSUMERS = 4;
     ASSERT_EQ(nodes.size() % NUM_CONSUMERS, 0);
     ASSERT_EQ(nodes.size() % num_producers, 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/periodic-counter-updater.h
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h
index 660502e..6887a91 100644
--- a/be/src/util/periodic-counter-updater.h
+++ b/be/src/util/periodic-counter-updater.h
@@ -137,7 +137,7 @@ class PeriodicCounterUpdater {
   TimeSeriesCounters time_series_counters_;
 
   /// If 1, tear down the update thread.
-  AtomicInt<int32_t> done_;
+  AtomicInt32 done_;
 
   /// Singleton object that keeps track of all rate counters and the thread
   /// for updating them.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/progress-updater.h
----------------------------------------------------------------------
diff --git a/be/src/util/progress-updater.h b/be/src/util/progress-updater.h
index d938221..774a45a 100644
--- a/be/src/util/progress-updater.h
+++ b/be/src/util/progress-updater.h
@@ -75,10 +75,10 @@ class ProgressUpdater {
   int update_period_;
 
   /// Number of completed work items.
-  AtomicInt<int64_t> num_complete_;
+  AtomicInt64 num_complete_;
 
   /// Percentage when the last output was generated.
-  AtomicInt<int> last_output_percentage_;
+  AtomicInt32 last_output_percentage_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 6858c5d..6695b65 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -125,7 +125,7 @@ class RuntimeProfile {
    protected:
     friend class RuntimeProfile;
 
-    AtomicInt<int64_t> value_;
+    AtomicInt64 value_;
     TUnit::type unit_;
   };
 
@@ -175,7 +175,7 @@ class RuntimeProfile {
 
     /// The current value of the counter. value_ in the super class represents
     /// the high water mark.
-    AtomicInt<int64_t> current_value_;
+    AtomicInt64 current_value_;
   };
 
   typedef boost::function<int64_t ()> DerivedCounterFunction;