You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/04/12 23:18:37 UTC
[03/50] incubator-impala git commit: Add AtomicInt32/AtomicInt64
typedefs
Add AtomicInt32/AtomicInt64 typedefs
Only certain primitive types can be used with AtomicInt<>, so let's
treat AtomicInt<> as an implementation detail and instead expose
typedefs for the Atomic integer sizes we support. Convert all decls to
use the new typedefs.
Change-Id: I8b91ba684aabc67ed1721c7b7320aa42049268c8
Reviewed-on: http://gerrit.cloudera.org:8080/2601
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/10e7de79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/10e7de79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/10e7de79
Branch: refs/heads/master
Commit: 10e7de7969f96fe3edb00fcab3f7a8c432bd20a0
Parents: c039791
Author: Dan Hecht <dh...@cloudera.com>
Authored: Tue Mar 22 14:24:23 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Mar 23 22:29:18 2016 +0000
----------------------------------------------------------------------
be/src/common/atomic-test.cc | 3 ++-
be/src/common/atomic.h | 12 +++++++++++-
be/src/exec/hdfs-scan-node.h | 8 ++++----
be/src/rpc/rpc-trace.h | 2 +-
be/src/runtime/data-stream-recvr.h | 2 +-
be/src/runtime/disk-io-mgr-internal.h | 24 ++++++++++++------------
be/src/runtime/disk-io-mgr-test.cc | 18 +++++++++---------
be/src/runtime/disk-io-mgr.h | 4 ++--
be/src/runtime/lib-cache.h | 2 +-
be/src/runtime/mem-tracker.cc | 2 +-
be/src/runtime/mem-tracker.h | 2 +-
be/src/runtime/plan-fragment-executor.h | 2 +-
be/src/scheduling/query-resource-mgr.cc | 8 ++++----
be/src/scheduling/query-resource-mgr.h | 8 ++++----
be/src/util/counting-barrier.h | 2 +-
be/src/util/hdfs-bulk-ops.h | 2 +-
be/src/util/internal-queue-test.cc | 6 +++---
be/src/util/periodic-counter-updater.h | 2 +-
be/src/util/progress-updater.h | 4 ++--
be/src/util/runtime-profile.h | 4 ++--
20 files changed, 64 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/common/atomic-test.cc
----------------------------------------------------------------------
diff --git a/be/src/common/atomic-test.cc b/be/src/common/atomic-test.cc
index 5715022..4e4559c 100644
--- a/be/src/common/atomic-test.cc
+++ b/be/src/common/atomic-test.cc
@@ -20,9 +20,10 @@
#include "common/names.h"
-
namespace impala {
+using namespace internal; // Testing AtomicInt<> directly.
+
// Simple test to make sure there is no obvious error in the operations. This is not
// intended to test the thread safety.
template<typename T>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/common/atomic.h
----------------------------------------------------------------------
diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h
index 07a9036..b0b8faa 100644
--- a/be/src/common/atomic.h
+++ b/be/src/common/atomic.h
@@ -45,7 +45,10 @@ class AtomicUtil {
}
};
-/// Atomic integer. 'T' can be either 32-bit or 64-bit signed integer. Each operation
+namespace internal {
+
+/// Atomic integer. This class template should not be used directly; instead use the
+/// typedefs below. 'T' can be either 32-bit or 64-bit signed integer. Each operation
/// is performed atomically and has a specified memory-ordering semantic:
///
/// Acquire: these operations ensure no later memory access by the same thread can be
@@ -102,6 +105,13 @@ class AtomicInt {
DISALLOW_COPY_AND_ASSIGN(AtomicInt);
};
+} // namespace internal
+
+/// Supported atomic types. Use these types rather than referring to AtomicInt<>
+/// directly.
+typedef internal::AtomicInt<int32_t> AtomicInt32;
+typedef internal::AtomicInt<int64_t> AtomicInt64;
+
}
#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 4ddd33d..73b9527 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -369,7 +369,7 @@ class HdfsScanNode : public ScanNode {
int64_t scanner_thread_bytes_required_;
/// Number of files that have not been issued from the scanners.
- AtomicInt<int> num_unqueued_files_;
+ AtomicInt32 num_unqueued_files_;
/// Map of HdfsScanner objects to file types. Only one scanner object will be
/// created for each file type. Objects stored in runtime_state's pool.
@@ -424,12 +424,12 @@ class HdfsScanNode : public ScanNode {
/// This is the number of io buffers that are owned by the scan node and the scanners.
/// This is used just to help debug leaked io buffers to determine if the leak is
/// happening in the scanners vs other parts of the execution.
- AtomicInt<int> num_owned_io_buffers_;
+ AtomicInt32 num_owned_io_buffers_;
/// Counters which track the number of scanners that have codegen enabled for the
/// materialize and conjuncts evaluation code paths.
- AtomicInt<int> num_scanners_codegen_enabled_;
- AtomicInt<int> num_scanners_codegen_disabled_;
+ AtomicInt32 num_scanners_codegen_enabled_;
+ AtomicInt32 num_scanners_codegen_disabled_;
/// The size of the largest compressed text file to be scanned. This is used to
/// estimate scanner thread memory usage.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/rpc/rpc-trace.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-trace.h b/be/src/rpc/rpc-trace.h
index aed05f4..5abe0a2 100644
--- a/be/src/rpc/rpc-trace.h
+++ b/be/src/rpc/rpc-trace.h
@@ -84,7 +84,7 @@ class RpcEventHandler : public apache::thrift::TProcessorEventHandler {
StatsMetric<double>* time_stats;
/// Number of invocations in flight
- AtomicInt<int32_t> num_in_flight;
+ AtomicInt32 num_in_flight;
};
/// Map from method name to descriptor
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.h b/be/src/runtime/data-stream-recvr.h
index 7879c44..2a3b184 100644
--- a/be/src/runtime/data-stream-recvr.h
+++ b/be/src/runtime/data-stream-recvr.h
@@ -136,7 +136,7 @@ class DataStreamRecvr {
bool is_merging_;
/// total number of bytes held across all sender queues.
- AtomicInt<int> num_buffered_bytes_;
+ AtomicInt32 num_buffered_bytes_;
/// Memtracker for batches in the sender queue(s).
boost::scoped_ptr<MemTracker> mem_tracker_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h
index fe607e7..e58a3f3 100644
--- a/be/src/runtime/disk-io-mgr-internal.h
+++ b/be/src/runtime/disk-io-mgr-internal.h
@@ -212,48 +212,48 @@ class DiskIoMgr::RequestContext {
RuntimeProfile::Counter* disks_accessed_bitmap_;
/// Total number of bytes read locally, updated at end of each range scan
- AtomicInt<int64_t> bytes_read_local_;
+ AtomicInt64 bytes_read_local_;
/// Total number of bytes read via short circuit read, updated at end of each range scan
- AtomicInt<int64_t> bytes_read_short_circuit_;
+ AtomicInt64 bytes_read_short_circuit_;
/// Total number of bytes read from date node cache, updated at end of each range scan
- AtomicInt<int64_t> bytes_read_dn_cache_;
+ AtomicInt64 bytes_read_dn_cache_;
/// Total number of bytes from remote reads that were expected to be local.
- AtomicInt<int64_t> unexpected_remote_bytes_;
+ AtomicInt64 unexpected_remote_bytes_;
/// The number of buffers that have been returned to the reader (via GetNext) that the
/// reader has not returned. Only included for debugging and diagnostics.
- AtomicInt<int> num_buffers_in_reader_;
+ AtomicInt32 num_buffers_in_reader_;
/// The number of scan ranges that have been completed for this reader.
- AtomicInt<int> num_finished_ranges_;
+ AtomicInt32 num_finished_ranges_;
/// The number of scan ranges that required a remote read, updated at the end of each
/// range scan. Only used for diagnostics.
- AtomicInt<int> num_remote_ranges_;
+ AtomicInt32 num_remote_ranges_;
/// The total number of scan ranges that have not been started. Only used for
/// diagnostics. This is the sum of all unstarted_scan_ranges across all disks.
- AtomicInt<int> num_unstarted_scan_ranges_;
+ AtomicInt32 num_unstarted_scan_ranges_;
/// The number of buffers that are being used for this reader. This is the sum
/// of all buffers in ScanRange queues and buffers currently being read into (i.e. about
/// to be queued).
- AtomicInt<int> num_used_buffers_;
+ AtomicInt32 num_used_buffers_;
/// The total number of ready buffers across all ranges. Ready buffers are buffers
/// that have been read from disk but not retrieved by the caller.
/// This is the sum of all queued buffers in all ranges for this reader context.
- AtomicInt<int> num_ready_buffers_;
+ AtomicInt32 num_ready_buffers_;
/// The total (sum) of queue capacities for finished scan ranges. This value
/// divided by num_finished_ranges_ is the average for finished ranges and
/// used to seed the starting queue capacity for future ranges. The assumption
/// is that if previous ranges were fast, new ones will be fast too. The scan
/// range adjusts the queue capacity dynamically so a rough approximation will do.
- AtomicInt<int> total_range_queue_capacity_;
+ AtomicInt32 total_range_queue_capacity_;
/// The initial queue size for new scan ranges. This is always
/// total_range_queue_capacity_ / num_finished_ranges_ but stored as a separate
@@ -447,7 +447,7 @@ class DiskIoMgr::RequestContext {
/// entire operation, we need this ref count to keep track of which thread should do
/// final resource cleanup during cancellation.
/// Only the thread that sees the count at 0 should do the final cleanup.
- AtomicInt<int> num_threads_in_op_;
+ AtomicInt32 num_threads_in_op_;
/// Queue of write ranges to process for this disk. A write range is always added
/// to in_flight_ranges_ in GetNextRequestRange(). There is a separate
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index 806a036..ee89f56 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -136,7 +136,7 @@ class DiskIoMgrTest : public testing::Test {
// Updates num_ranges_processed with the number of ranges seen by this thread.
static void ScanRangeThread(DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader,
const char* expected_result, int expected_len, const Status& expected_status,
- int max_ranges, AtomicInt<int>* num_ranges_processed) {
+ int max_ranges, AtomicInt32* num_ranges_processed) {
int num_ranges = 0;
while (max_ranges == 0 || num_ranges < max_ranges) {
DiskIoMgr::ScanRange* range;
@@ -373,7 +373,7 @@ TEST_F(DiskIoMgrTest, SingleReader) {
}
ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
- AtomicInt<int> num_ranges_processed;
+ AtomicInt32 num_ranges_processed;
thread_group threads;
for (int i = 0; i < num_read_threads; ++i) {
threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
@@ -432,7 +432,7 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
stat_val.st_mtime));
}
}
- AtomicInt<int> num_ranges_processed;
+ AtomicInt32 num_ranges_processed;
// Issue first half the scan ranges.
ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_first_half));
@@ -507,7 +507,7 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
}
ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
- AtomicInt<int> num_ranges_processed;
+ AtomicInt32 num_ranges_processed;
thread_group threads;
for (int i = 0; i < 5; ++i) {
threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
@@ -570,7 +570,7 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
}
ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
- AtomicInt<int> num_ranges_processed;
+ AtomicInt32 num_ranges_processed;
int num_succesful_ranges = ranges.size() / 2;
// Read half the ranges
for (int i = 0; i < num_succesful_ranges; ++i) {
@@ -637,7 +637,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
// Don't return buffers to force memory pressure
vector<DiskIoMgr::BufferDescriptor*> buffers;
- AtomicInt<int> num_ranges_processed;
+ AtomicInt32 num_ranges_processed;
ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::MemLimitExceeded(),
1, &num_ranges_processed);
@@ -717,7 +717,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
}
ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
- AtomicInt<int> num_ranges_processed;
+ AtomicInt32 num_ranges_processed;
thread_group threads;
for (int i = 0; i < 5; ++i) {
threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
@@ -780,7 +780,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
while (read_offset < file_size) {
for (int context_index = 0; context_index < num_contexts; ++context_index) {
if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
- AtomicInt<int> num_ranges_processed;
+ AtomicInt32 num_ranges_processed;
thread_group threads;
vector<DiskIoMgr::ScanRange*> ranges;
int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset);
@@ -898,7 +898,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
ASSERT_OK(io_mgr.AddScanRanges(readers[i], ranges));
}
- AtomicInt<int> num_ranges_processed;
+ AtomicInt32 num_ranges_processed;
thread_group threads;
for (int i = 0; i < NUM_READERS; ++i) {
for (int j = 0; j < NUM_THREADS_PER_READER; ++j) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index d695555..b130d52 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -734,10 +734,10 @@ class DiskIoMgr {
std::list<BufferDescriptor*> free_buffer_descs_;
/// Total number of allocated buffers, used for debugging.
- AtomicInt<int> num_allocated_buffers_;
+ AtomicInt32 num_allocated_buffers_;
/// Total number of buffers in readers
- AtomicInt<int> num_buffers_in_readers_;
+ AtomicInt32 num_buffers_in_readers_;
/// Per disk queues. This is static and created once at Init() time. One queue is
/// allocated for each local disk on the system and for each remote filesystem type.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/lib-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h
index 005f166..b36a859 100644
--- a/be/src/runtime/lib-cache.h
+++ b/be/src/runtime/lib-cache.h
@@ -117,7 +117,7 @@ class LibCache {
/// The number of libs that have been copied from HDFS to the local FS.
/// This is appended to the local fs path to remove collisions.
- AtomicInt<int64_t> num_libs_copied_;
+ AtomicInt64 num_libs_copied_;
/// Protects lib_cache_. For lock ordering, this lock must always be taken before
/// the per entry lock.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index 1fc462f..e12c946 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -39,7 +39,7 @@ MemTracker::RequestTrackersMap MemTracker::request_to_mem_trackers_;
MemTracker::PoolTrackersMap MemTracker::pool_to_mem_trackers_;
mutex MemTracker::static_mem_trackers_lock_;
-AtomicInt<int64_t> MemTracker::released_memory_since_gc_;
+AtomicInt64 MemTracker::released_memory_since_gc_;
// Name for request pool MemTrackers. '$0' is replaced with the pool name.
const string REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT = "RequestPool=$0";
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 46f08c3..2fd23a7 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -394,7 +394,7 @@ class MemTracker {
/// Total amount of memory from calls to Release() since the last GC. If this
/// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc.
- static AtomicInt<int64_t> released_memory_since_gc_;
+ static AtomicInt64 released_memory_since_gc_;
/// Lock to protect GcMemory(). This prevents many GCs from occurring at once.
boost::mutex gc_lock_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
index bb2f3a9..91cece6 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -231,7 +231,7 @@ class PlanFragmentExecutor {
/// be fired. It is initialized to 0 and atomically swapped to 1 when a completed
/// fragment report is about to be fired. Used for reducing the probability that a
/// report is sent twice at the end of the fragment.
- AtomicInt<int> completed_report_sent_;
+ AtomicInt32 completed_report_sent_;
/// Sampled memory usage at even time intervals.
RuntimeProfile::TimeSeriesCounter* mem_usage_sampled_counter_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/scheduling/query-resource-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-resource-mgr.cc b/be/src/scheduling/query-resource-mgr.cc
index 6a5f8f6..e1e0581 100644
--- a/be/src/scheduling/query-resource-mgr.cc
+++ b/be/src/scheduling/query-resource-mgr.cc
@@ -97,8 +97,8 @@ void QueryResourceMgr::InitVcoreAcquisition(int32_t init_vcores) {
// inspects immediately after exiting Expand(), and if true, exits before touching any
// of the class-wide state (because the destructor may have finished before this point).
- thread_in_expand_.reset(new AtomicInt<int32_t>());
- early_exit_.reset(new AtomicInt<int32_t>());
+ thread_in_expand_.reset(new AtomicInt32());
+ early_exit_.reset(new AtomicInt32());
acquire_vcore_thread_.reset(
new Thread("resource-mgmt", Substitute("acquire-cpu-$0", PrintId(query_id_)),
bind<void>(mem_fn(&QueryResourceMgr::AcquireVcoreResources), this,
@@ -170,8 +170,8 @@ Status QueryResourceMgr::RequestMemExpansion(int64_t requested_bytes,
}
void QueryResourceMgr::AcquireVcoreResources(
- shared_ptr<AtomicInt<int32_t> > thread_in_expand,
- shared_ptr<AtomicInt<int32_t> > early_exit) {
+ shared_ptr<AtomicInt32> thread_in_expand,
+ shared_ptr<AtomicInt32> early_exit) {
// Take a copy because we'd like to print it in some cases after the destructor.
TUniqueId reservation_id = reservation_id_;
VLOG_QUERY << "Starting Vcore acquisition for: " << reservation_id;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/scheduling/query-resource-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-resource-mgr.h b/be/src/scheduling/query-resource-mgr.h
index 589e327..c6080e8 100644
--- a/be/src/scheduling/query-resource-mgr.h
+++ b/be/src/scheduling/query-resource-mgr.h
@@ -195,12 +195,12 @@ class QueryResourceMgr {
/// parent QueryResourceMgr has been destroyed.
/// TODO: Combine with ShouldExit(), and replace with AtomicBool when we have such a
/// thing.
- boost::shared_ptr<AtomicInt<int32_t> > early_exit_;
+ boost::shared_ptr<AtomicInt32> early_exit_;
/// Signals to the destructor that the vcore acquisition thread is currently in an
/// Expand() RPC. If so, the destructor does not need to wait for the acquisition thread
/// to exit.
- boost::shared_ptr<AtomicInt<int32_t> > thread_in_expand_;
+ boost::shared_ptr<AtomicInt32> thread_in_expand_;
/// Creates the llama resource for the memory and/or cores specified, associated with
/// the reservation context.
@@ -209,8 +209,8 @@ class QueryResourceMgr {
/// Run as a thread owned by acquire_cpu_thread_. Waits for notification from
/// NotifyThreadUsageChange(), then checks the subscription level to decide if more
/// VCores are needed, and starts a new expansion request if so.
- void AcquireVcoreResources(boost::shared_ptr<AtomicInt<int32_t> > thread_in_expand,
- boost::shared_ptr<AtomicInt<int32_t> > early_exit);
+ void AcquireVcoreResources(boost::shared_ptr<AtomicInt32 > thread_in_expand,
+ boost::shared_ptr<AtomicInt32> early_exit);
/// True if thread:VCore subscription is too high, meaning more VCores are required.
/// Must be called holding threads_running_ lock.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/counting-barrier.h
----------------------------------------------------------------------
diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h
index 917c8ec..72e7ac9 100644
--- a/be/src/util/counting-barrier.h
+++ b/be/src/util/counting-barrier.h
@@ -43,7 +43,7 @@ class CountingBarrier {
Promise<bool> promise_;
/// The number of pending notifications remaining.
- AtomicInt<int32_t> count_;
+ AtomicInt32 count_;
DISALLOW_COPY_AND_ASSIGN(CountingBarrier);
};
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/hdfs-bulk-ops.h
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-bulk-ops.h b/be/src/util/hdfs-bulk-ops.h
index f543a06..432029f 100644
--- a/be/src/util/hdfs-bulk-ops.h
+++ b/be/src/util/hdfs-bulk-ops.h
@@ -125,7 +125,7 @@ class HdfsOperationSet {
/// The number of ops remaining to be executed. Used to coordinate between executor
/// threads so that when all ops are finished, promise_ is signalled.
- AtomicInt<int64_t> num_ops_;
+ AtomicInt64 num_ops_;
/// HDFS connection shared between all operations. Not owned by this class.
hdfsFS* hdfs_connection_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/internal-queue-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/internal-queue-test.cc b/be/src/util/internal-queue-test.cc
index 26e91a5..30bb1e8 100644
--- a/be/src/util/internal-queue-test.cc
+++ b/be/src/util/internal-queue-test.cc
@@ -150,7 +150,7 @@ const int VALIDATE_INTERVAL = 10000;
// CHECK() is not thread safe so return the result in *failed.
void ProducerThread(InternalQueue<IntNode>* queue, int num_inserts,
- vector<IntNode>* nodes, AtomicInt<int32_t>* counter, bool* failed) {
+ vector<IntNode>* nodes, AtomicInt32* counter, bool* failed) {
for (int i = 0; i < num_inserts && !*failed; ++i) {
// Get the next index to queue.
int32_t value = counter->Add(1) - 1;
@@ -204,7 +204,7 @@ TEST(InternalQueue, TestClear) {
TEST(InternalQueue, TestSingleProducerSingleConsumer) {
vector<IntNode> nodes;
- AtomicInt<int32_t> counter;
+ AtomicInt32 counter;
nodes.resize(1000000);
vector<int> results;
@@ -233,7 +233,7 @@ TEST(InternalQueue, TestMultiProducerMultiConsumer) {
bool failed = false;
for (int num_producers = 1; num_producers < 5; num_producers += 3) {
- AtomicInt<int32_t> counter;
+ AtomicInt32 counter;
const int NUM_CONSUMERS = 4;
ASSERT_EQ(nodes.size() % NUM_CONSUMERS, 0);
ASSERT_EQ(nodes.size() % num_producers, 0);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/periodic-counter-updater.h
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h
index 660502e..6887a91 100644
--- a/be/src/util/periodic-counter-updater.h
+++ b/be/src/util/periodic-counter-updater.h
@@ -137,7 +137,7 @@ class PeriodicCounterUpdater {
TimeSeriesCounters time_series_counters_;
/// If 1, tear down the update thread.
- AtomicInt<int32_t> done_;
+ AtomicInt32 done_;
/// Singleton object that keeps track of all rate counters and the thread
/// for updating them.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/progress-updater.h
----------------------------------------------------------------------
diff --git a/be/src/util/progress-updater.h b/be/src/util/progress-updater.h
index d938221..774a45a 100644
--- a/be/src/util/progress-updater.h
+++ b/be/src/util/progress-updater.h
@@ -75,10 +75,10 @@ class ProgressUpdater {
int update_period_;
/// Number of completed work items.
- AtomicInt<int64_t> num_complete_;
+ AtomicInt64 num_complete_;
/// Percentage when the last output was generated.
- AtomicInt<int> last_output_percentage_;
+ AtomicInt32 last_output_percentage_;
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/10e7de79/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 6858c5d..6695b65 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -125,7 +125,7 @@ class RuntimeProfile {
protected:
friend class RuntimeProfile;
- AtomicInt<int64_t> value_;
+ AtomicInt64 value_;
TUnit::type unit_;
};
@@ -175,7 +175,7 @@ class RuntimeProfile {
/// The current value of the counter. value_ in the super class represents
/// the high water mark.
- AtomicInt<int64_t> current_value_;
+ AtomicInt64 current_value_;
};
typedef boost::function<int64_t ()> DerivedCounterFunction;