You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/26 02:41:52 UTC

[2/4] incubator-impala git commit: Refactor RuntimeState and ExecEnv dependencies

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 8c423c0..c970163 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -12,6 +12,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include "runtime/runtime-state.h"
+
 #include <iostream>
 #include <jni.h>
 #include <sstream>
@@ -27,11 +29,11 @@
 #include "exprs/expr.h"
 #include "runtime/buffered-block-mgr.h"
 #include "runtime/descriptors.h"
-#include "runtime/runtime-state.h"
-#include "runtime/timestamp-value.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/data-stream-recvr.h"
-#include "runtime/runtime-filter.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/runtime-filter-bank.h"
+#include "runtime/timestamp-value.h"
 #include "util/bitmap.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
@@ -75,7 +77,7 @@ RuntimeState::RuntimeState(const TExecPlanFragmentParams& fragment_params,
     is_cancelled_(false),
     query_resource_mgr_(NULL),
     root_node_id_(-1),
-    filter_bank_(fragment_ctx().query_ctx, this) {
+    filter_bank_(new RuntimeFilterBank(fragment_ctx().query_ctx, this)) {
   Status status = Init(exec_env);
   DCHECK(status.ok()) << status.GetDetail();
 }
@@ -90,7 +92,7 @@ RuntimeState::RuntimeState(const TQueryCtx& query_ctx)
     is_cancelled_(false),
     query_resource_mgr_(NULL),
     root_node_id_(-1),
-    filter_bank_(query_ctx, this) {
+    filter_bank_(new RuntimeFilterBank(query_ctx, this)) {
   fragment_params_.fragment_instance_ctx.__set_query_ctx(query_ctx);
   fragment_params_.fragment_instance_ctx.query_ctx.request.query_options
       .__set_batch_size(DEFAULT_BATCH_SIZE);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 860692f..15c8d9c 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -16,40 +16,34 @@
 #ifndef IMPALA_RUNTIME_RUNTIME_STATE_H
 #define IMPALA_RUNTIME_RUNTIME_STATE_H
 
-/// needed for scoped_ptr to work on ObjectPool
-#include "common/object-pool.h"
-
 #include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 #include <vector>
 #include <string>
-/// stringstream is a typedef, so can't forward declare it.
-#include <sstream>
 
-#include "scheduling/query-resource-mgr.h"
+// NOTE: try not to add more headers here: runtime-state.h is included in many many files.
+#include "common/global-types.h"  // for PlanNodeId
+#include "runtime/client-cache-types.h"
 #include "runtime/exec-env.h"
-#include "runtime/descriptors.h"  // for PlanNodeId
-#include "runtime/disk-io-mgr.h"  // for DiskIoMgr::RequestContext
-#include "runtime/mem-tracker.h"
-#include "runtime/runtime-filter.h"
 #include "runtime/thread-resource-mgr.h"
-#include "gen-cpp/PlanNodes_types.h"
-#include "gen-cpp/Types_types.h"  // for TUniqueId
-#include "gen-cpp/ImpalaInternalService_types.h"  // for TQueryOptions
-#include "util/auth-util.h"
+#include "util/auth-util.h" // for GetEffectiveUser()
 #include "util/runtime-profile.h"
 
 namespace impala {
 
 class BufferedBlockMgr;
+class DataStreamRecvr;
 class DescriptorTbl;
-class ObjectPool;
-class Status;
-class ExecEnv;
+class DiskIoRequestContext;
 class Expr;
 class LlvmCodeGen;
+class MemTracker;
+class ObjectPool;
+class RuntimeFilterBank;
+class Status;
 class TimestampValue;
-class DataStreamRecvr;
+class TQueryOptions;
+class TUniqueId;
 
 /// Counts how many rows an INSERT query has added to a particular partition
 /// (partitions are identified by their partition keys: k1=v1/k2=v2
@@ -140,7 +134,7 @@ class RuntimeState {
   ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
 
   FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; }
-  std::vector<DiskIoMgr::RequestContext*>* reader_contexts() { return &reader_contexts_; }
+  std::vector<DiskIoRequestContext*>* reader_contexts() { return &reader_contexts_; }
 
   void set_fragment_root_id(PlanNodeId id) {
     DCHECK_EQ(root_node_id_, -1) << "Should not set this twice.";
@@ -151,7 +145,7 @@ class RuntimeState {
   /// See comment on root_node_id_. We add one to prevent having a hash seed of 0.
   uint32_t fragment_hash_seed() const { return root_node_id_ + 1; }
 
-  RuntimeFilterBank* filter_bank() { return &filter_bank_; }
+  RuntimeFilterBank* filter_bank() { return filter_bank_.get(); }
 
   PartitionStatusMap* per_partition_status() { return &per_partition_status_; }
 
@@ -348,7 +342,7 @@ class RuntimeState {
   QueryResourceMgr* query_resource_mgr_;
 
   /// Reader contexts that need to be closed when the fragment is closed.
-  std::vector<DiskIoMgr::RequestContext*> reader_contexts_;
+  std::vector<DiskIoRequestContext*> reader_contexts_;
 
   /// BufferedBlockMgr object used to allocate and manage blocks of input data in memory
   /// with a fixed memory budget.
@@ -366,7 +360,7 @@ class RuntimeState {
 
   /// Manages runtime filters that are either produced or consumed (or both!) by plan
   /// nodes that share this runtime state.
-  RuntimeFilterBank filter_bank_;
+  boost::scoped_ptr<RuntimeFilterBank> filter_bank_;
 
   /// prohibit copies
   RuntimeState(const RuntimeState&);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc
index 0376437..328d1c2 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -18,7 +18,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/sorter.h"
 #include "runtime/tuple-row.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index e0d388a..299dd2f 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -22,7 +22,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/sorted-run-merger.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index c8f79df..ec59806 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -18,6 +18,7 @@
 
 #include "common/status.h"
 #include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/string-value.h"
 
 using namespace strings;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/thread-resource-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr.h b/be/src/runtime/thread-resource-mgr.h
index cb2c13e..00aaeba 100644
--- a/be/src/runtime/thread-resource-mgr.h
+++ b/be/src/runtime/thread-resource-mgr.h
@@ -18,10 +18,7 @@
 #include <stdlib.h>
 
 #include <boost/function.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/shared_ptr.hpp>
 #include <boost/thread/mutex.hpp>
-#include <boost/thread/thread.hpp>
 
 #include <list>
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/tuple.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index fe85948..93e63f3 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -29,6 +29,7 @@
 #include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
 #include "util/debug-util.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 70174a1..94e21b8 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -25,7 +25,7 @@
 #include "runtime/mem-tracker.h"
 #include "util/debug-util.h"
 #include "util/time.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "util/pretty-printer.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/scheduling/query-resource-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-resource-mgr.cc b/be/src/scheduling/query-resource-mgr.cc
index 5687269..bf42329 100644
--- a/be/src/scheduling/query-resource-mgr.cc
+++ b/be/src/scheduling/query-resource-mgr.cc
@@ -22,6 +22,7 @@
 #include "runtime/exec-env.h"
 #include "resourcebroker/resource-broker.h"
 #include "util/bit-util.h"
+#include "util/cgroups-mgr.h"
 #include "util/container-util.h"
 #include "util/network-util.h"
 #include "util/promise.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index 7abc916..a14509c 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -24,6 +24,7 @@
 
 #include "common/logging.h"
 #include "util/metrics.h"
+#include "resourcebroker/resource-broker.h"
 #include "runtime/exec-env.h"
 #include "runtime/coordinator.h"
 #include "service/impala-server.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/fragment-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc
index 680e295..38866f1 100644
--- a/be/src/service/fragment-exec-state.cc
+++ b/be/src/service/fragment-exec-state.cc
@@ -20,6 +20,7 @@
 #include "gen-cpp/ImpalaInternalService.h"
 #include "rpc/thrift-util.h"
 #include "gutil/strings/substitute.h"
+#include "runtime/runtime-filter-bank.h"
 #include "util/bloom-filter.h"
 #include "runtime/backend-client.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/fragment-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc
index 5210f8c..f64ca7e 100644
--- a/be/src/service/fragment-mgr.cc
+++ b/be/src/service/fragment-mgr.cc
@@ -20,6 +20,7 @@
 
 #include "service/fragment-exec-state.h"
 #include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
 #include "util/impalad-metrics.h"
 #include "util/uid-util.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 478c1ea..35cb945 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -41,6 +41,7 @@
 #include "runtime/plan-fragment-executor.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/timestamp-value.h"
 #include "scheduling/simple-scheduler.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 1dc4982..ce5df38 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -31,11 +31,12 @@
 #include "common/logging.h"
 #include "common/version.h"
 #include "exprs/expr.h"
+#include "rpc/thrift-util.h"
 #include "runtime/raw-value.h"
 #include "service/query-exec-state.h"
 #include "service/query-options.h"
 #include "util/debug-util.h"
-#include "rpc/thrift-util.h"
+#include "util/runtime-profile-counters.h"
 #include "util/impalad-metrics.h"
 #include "util/string-parser.h"
 #include "service/hs2-util.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index b534902..ec70c47 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -18,6 +18,8 @@
 
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
+#include "resourcebroker/resource-broker.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "service/impala-server.h"
@@ -25,6 +27,7 @@
 #include "service/query-options.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
+#include "util/runtime-profile-counters.h"
 #include "util/time.h"
 
 #include "gen-cpp/CatalogService.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/statestore/statestore-subscriber.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 3808459..ddf663e 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -21,23 +21,22 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread/mutex.hpp>
-#include <boost/thread/thread.hpp>
 
 #include "statestore/statestore.h"
 #include "util/stopwatch.h"
 #include "rpc/thrift-util.h"
 #include "rpc/thrift-client.h"
-#include "util/thread.h"
 #include "util/metrics.h"
 #include "gen-cpp/StatestoreService.h"
 #include "gen-cpp/StatestoreSubscriber.h"
 
 namespace impala {
 
-class TimeoutFailureDetector;
 class Status;
-class TNetworkAddress;
+class TimeoutFailureDetector;
+class Thread;
 class ThriftServer;
+class TNetworkAddress;
 
 typedef ClientCache<StatestoreServiceClient> StatestoreClientCache;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/testutil/desc-tbl-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/desc-tbl-builder.cc b/be/src/testutil/desc-tbl-builder.cc
index 358ae54..3b3994a 100644
--- a/be/src/testutil/desc-tbl-builder.cc
+++ b/be/src/testutil/desc-tbl-builder.cc
@@ -15,7 +15,7 @@
 #include "testutil/desc-tbl-builder.h"
 #include "util/bit-util.h"
 
-
+#include "common/object-pool.h"
 #include "runtime/descriptors.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/testutil/desc-tbl-builder.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/desc-tbl-builder.h b/be/src/testutil/desc-tbl-builder.h
index ec816ee..fdaf67e 100644
--- a/be/src/testutil/desc-tbl-builder.h
+++ b/be/src/testutil/desc-tbl-builder.h
@@ -16,6 +16,7 @@
 #define IMPALA_TESTUTIL_ROW_DESC_BUILDER_H_
 
 #include "runtime/runtime-state.h"
+#include "runtime/types.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/auth-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/auth-util.cc b/be/src/util/auth-util.cc
index e75a250..afe037c 100644
--- a/be/src/util/auth-util.cc
+++ b/be/src/util/auth-util.cc
@@ -14,6 +14,8 @@
 
 #include "util/auth-util.h"
 
+#include "gen-cpp/ImpalaInternalService_types.h"
+
 using namespace std;
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/auth-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/auth-util.h b/be/src/util/auth-util.h
index e44bf70..d82c61a 100644
--- a/be/src/util/auth-util.h
+++ b/be/src/util/auth-util.h
@@ -18,16 +18,16 @@
 
 #include <string>
 
-#include "gen-cpp/ImpalaInternalService_types.h"
-
 namespace impala {
 
-  /// Returns a reference to the "effective user" from the specified session. Queries
-  /// are run and authorized on behalf of the effective user. When a delegated_user is
-  /// specified (is not empty), the effective user is the delegated_user. This is because
-  /// the connected_user is acting as a "proxy user" for the delegated_user. When
-  /// delegated_user is empty, the effective user is the connected user.
-  const std::string& GetEffectiveUser(const TSessionState& session);
+class TSessionState;
+
+/// Returns a reference to the "effective user" from the specified session. Queries
+/// are run and authorized on behalf of the effective user. When a delegated_user is
+/// specified (is not empty), the effective user is the delegated_user. This is because
+/// the connected_user is acting as a "proxy user" for the delegated_user. When
+/// delegated_user is empty, the effective user is the connected user.
+const std::string& GetEffectiveUser(const TSessionState& session);
 
 } // namespace impala
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/avro-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/avro-util.cc b/be/src/util/avro-util.cc
index b591f23..5cce8a3 100644
--- a/be/src/util/avro-util.cc
+++ b/be/src/util/avro-util.cc
@@ -12,10 +12,11 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include <avro/schema.h>
-
 #include "util/avro-util.h"
 
+#include <avro/schema.h>
+#include <sstream>
+
 using namespace std;
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/codec.h
----------------------------------------------------------------------
diff --git a/be/src/util/codec.h b/be/src/util/codec.h
index afe3791..a337983 100644
--- a/be/src/util/codec.h
+++ b/be/src/util/codec.h
@@ -17,8 +17,6 @@
 #define IMPALA_UTIL_CODEC_H
 
 #include "common/status.h"
-#include "runtime/mem-pool.h"
-#include "util/runtime-profile.h"
 
 #include <boost/scoped_ptr.hpp>
 #include "gen-cpp/Descriptors_types.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index 588aff6..8a2ea74 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -15,6 +15,7 @@
 #include <boost/assign/list_of.hpp>
 #include "util/decompress.h"
 #include "exec/read-write-util.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
 #include "common/logging.h"
 #include "gen-cpp/Descriptors_types.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index 7a48886..795a6b1 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -24,7 +24,6 @@
 #include "runtime/mem-pool.h"
 #include "runtime/string-value.h"
 #include "util/rle-encoding.h"
-#include "util/runtime-profile.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/hdfs-bulk-ops-defs.h
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-bulk-ops-defs.h b/be/src/util/hdfs-bulk-ops-defs.h
new file mode 100644
index 0000000..13e1be0
--- /dev/null
+++ b/be/src/util/hdfs-bulk-ops-defs.h
@@ -0,0 +1,31 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_UTIL_HDFS_BULK_OPS_DEFS_H
+#define IMPALA_UTIL_HDFS_BULK_OPS_DEFS_H
+
+namespace impala {
+
+/// Forward declarations for HDFS ops.
+template <typename T>
+class ThreadPool;
+
+class HdfsOp;
+class HdfsOperationSet;
+
+typedef ThreadPool<HdfsOp> HdfsOpThreadPool;
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/hdfs-bulk-ops.h
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-bulk-ops.h b/be/src/util/hdfs-bulk-ops.h
index e66f0d1..97feaa0 100644
--- a/be/src/util/hdfs-bulk-ops.h
+++ b/be/src/util/hdfs-bulk-ops.h
@@ -21,6 +21,7 @@
 #include "common/hdfs.h"
 #include "common/atomic.h"
 #include "common/status.h"
+#include "util/hdfs-bulk-ops-defs.h"
 #include "util/thread-pool.h"
 #include "util/counting-barrier.h"
 #include "runtime/hdfs-fs-cache.h"
@@ -36,8 +37,6 @@ enum HdfsOpType {
   CHMOD
 };
 
-class HdfsOperationSet;
-
 /// Container class that encapsulates a single HDFS operation. Used only internally by
 /// HdfsOperationSet, but visible because it parameterises HdfsOpThreadPool.
 class HdfsOp {
@@ -81,8 +80,6 @@ class HdfsOp {
   void AddError(const string& error_msg) const;
 };
 
-typedef ThreadPool<HdfsOp> HdfsOpThreadPool;
-
 /// Creates a new HdfsOp-processing thread pool.
 HdfsOpThreadPool* CreateHdfsOpThreadPool(const std::string& name, uint32_t num_threads,
     uint32_t max_queue_length);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index ab4b6c2..9ddaa48 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -24,8 +24,8 @@
 #include <boost/thread/locks.hpp>
 
 #include "common/logging.h"
-#include "common/status.h"
 #include "common/object-pool.h"
+#include "common/status.h"
 #include "util/debug-util.h"
 #include "util/json-util.h"
 #include "util/pretty-printer.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/periodic-counter-updater.cc
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc
index a3979d8..d1cbac5 100644
--- a/be/src/util/periodic-counter-updater.cc
+++ b/be/src/util/periodic-counter-updater.cc
@@ -14,6 +14,7 @@
 
 #include "util/periodic-counter-updater.h"
 
+#include "util/runtime-profile-counters.h"
 #include "util/time.h"
 
 #include "common/names.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile-counters.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
new file mode 100644
index 0000000..4b51c4f
--- /dev/null
+++ b/be/src/util/runtime-profile-counters.h
@@ -0,0 +1,488 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
+#ifndef IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
+#define IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/unordered_map.hpp>
+#include <sys/time.h>
+#include <sys/resource.h>
+
+#include "common/atomic.h"
+#include "common/logging.h"
+#include "util/runtime-profile.h"
+#include "util/stopwatch.h"
+#include "util/streaming-sampler.h"
+
+namespace impala {
+
+/// Define macros for updating counters.  The macros make it very easy to disable
+/// all counters at compile time.  Set this to 0 to remove counters.  This is useful
+/// to do to make sure the counters aren't affecting the system.
+#define ENABLE_COUNTERS 1
+
+/// Some macro magic to generate unique ids using __COUNTER__
+#define CONCAT_IMPL(x, y) x##y
+#define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y)
+
+#if ENABLE_COUNTERS
+  #define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit)
+  #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \
+      (profile)->AddTimeSeriesCounter(name, src_counter)
+  #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
+  #define ADD_CHILD_TIMER(profile, name, parent) \
+      (profile)->AddCounter(name, TUnit::TIME_NS, parent)
+  #define SCOPED_TIMER(c) \
+      ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
+  #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \
+      ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled)
+  #define COUNTER_ADD(c, v) (c)->Add(v)
+  #define COUNTER_SET(c, v) (c)->Set(v)
+  #define ADD_THREAD_COUNTERS(profile, prefix) (profile)->AddThreadCounters(prefix)
+  #define SCOPED_THREAD_COUNTER_MEASUREMENT(c) \
+    ThreadCounterMeasurement \
+      MACRO_CONCAT(SCOPED_THREAD_COUNTER_MEASUREMENT, __COUNTER__)(c)
+  #define SCOPED_CONCURRENT_COUNTER(c) \
+    ScopedStopWatch<RuntimeProfile::ConcurrentTimerCounter> \
+      MACRO_CONCAT(SCOPED_CONCURRENT_COUNTER, __COUNTER__)(c)
+#else
+  #define ADD_COUNTER(profile, name, unit) NULL
+  #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) NULL
+  #define ADD_TIMER(profile, name) NULL
+  #define ADD_CHILD_TIMER(profile, name, parent) NULL
+  #define SCOPED_TIMER(c)
+  #define CANCEL_SAFE_SCOPED_TIMER(c)
+  #define COUNTER_ADD(c, v)
+  #define COUNTER_SET(c, v)
+  #define ADD_THREAD_COUNTERS(profile, prefix) NULL
+  #define SCOPED_THREAD_COUNTER_MEASUREMENT(c)
+  #define SCOPED_CONCURRENT_COUNTER(c)
+#endif
+
+/// A counter that keeps track of the highest value seen (reporting that
+/// as value()) and the current value.
+class RuntimeProfile::HighWaterMarkCounter : public RuntimeProfile::Counter {
+ public:
+  HighWaterMarkCounter(TUnit::type unit) : Counter(unit) {}
+
+  virtual void Add(int64_t delta) {
+    int64_t new_val = current_value_.Add(delta);
+    UpdateMax(new_val);
+  }
+
+  /// Tries to increase the current value by delta. If current_value() + delta
+  /// exceeds max, return false and current_value is not changed.
+  bool TryAdd(int64_t delta, int64_t max) {
+    while (true) {
+      int64_t old_val = current_value_.Load();
+      int64_t new_val = old_val + delta;
+      if (UNLIKELY(new_val > max)) return false;
+      if (LIKELY(current_value_.CompareAndSwap(old_val, new_val))) {
+        UpdateMax(new_val);
+        return true;
+      }
+    }
+  }
+
+  virtual void Set(int64_t v) {
+    current_value_.Store(v);
+    UpdateMax(v);
+  }
+
+  int64_t current_value() const { return current_value_.Load(); }
+
+ private:
+  /// Set 'value_' to 'v' if 'v' is larger than 'value_'. The entire operation is
+  /// atomic.
+  void UpdateMax(int64_t v) {
+    while (true) {
+      int64_t old_max = value_.Load();
+      int64_t new_max = std::max(old_max, v);
+      if (new_max == old_max) break; // Avoid atomic update.
+      if (LIKELY(value_.CompareAndSwap(old_max, new_max))) break;
+    }
+  }
+
+  /// The current value of the counter. value_ in the super class represents
+  /// the high water mark.
+  AtomicInt64 current_value_;
+};
+
+/// A DerivedCounter also has a name and unit, but the value is computed.
+/// Do not call Set() and Add().
+class RuntimeProfile::DerivedCounter : public RuntimeProfile::Counter {
+ public:
+  DerivedCounter(TUnit::type unit, const DerivedCounterFunction& counter_fn)
+    : Counter(unit),
+      counter_fn_(counter_fn) {}
+
+  virtual int64_t value() const {
+    return counter_fn_();
+  }
+
+ private:
+  DerivedCounterFunction counter_fn_;
+};
+
+/// An AveragedCounter maintains a set of counters and its value is the
+/// average of the values in that set. The average is updated through calls
+/// to UpdateCounter(), which may add a new counter or update an existing counter.
+/// Set() and Add() should not be called.
+class RuntimeProfile::AveragedCounter : public RuntimeProfile::Counter {
+ public:
+  AveragedCounter(TUnit::type unit)
+   : Counter(unit),
+     current_double_sum_(0.0),
+     current_int_sum_(0) {
+  }
+
+  /// Update counter_value_map_ with the new counter. This may require the counter
+  /// to be added to the map.
+  /// No locks are obtained within this class because UpdateCounter() is called from
+  /// UpdateAverage(), which obtains locks on the entire counter map in a profile.
+  void UpdateCounter(Counter* new_counter) {
+    DCHECK_EQ(new_counter->unit_, unit_);
+    boost::unordered_map<Counter*, int64_t>::iterator it =
+        counter_value_map_.find(new_counter);
+    int64_t old_val = 0;
+    if (it != counter_value_map_.end()) {
+      old_val = it->second;
+      it->second = new_counter->value();
+    } else {
+      counter_value_map_[new_counter] = new_counter->value();
+    }
+
+    if (unit_ == TUnit::DOUBLE_VALUE) {
+      double old_double_val = *reinterpret_cast<double*>(&old_val);
+      current_double_sum_ += (new_counter->double_value() - old_double_val);
+      double result_val = current_double_sum_ / (double) counter_value_map_.size();
+      value_.Store(*reinterpret_cast<int64_t*>(&result_val));
+    } else {
+      current_int_sum_ += (new_counter->value() - old_val);
+      value_.Store(current_int_sum_ / counter_value_map_.size());
+    }
+  }
+
+  /// The value for this counter should be updated through UpdateCounter().
+  /// Set() and Add() should not be used.
+  virtual void Set(double value) {
+    DCHECK(false);
+  }
+
+  virtual void Set(int64_t value) {
+    DCHECK(false);
+  }
+
+  virtual void Add(int64_t delta) {
+    DCHECK(false);
+  }
+
+ private:
+  /// Map from counters to their existing values. Modified via UpdateCounter().
+  boost::unordered_map<Counter*, int64_t> counter_value_map_;
+
+  /// Current sums of values from counter_value_map_. Only one of these is used,
+  /// depending on the unit of the counter. current_double_sum_ is used for
+  /// DOUBLE_VALUE, current_int_sum_ otherwise.
+  double current_double_sum_;
+  int64_t current_int_sum_;
+};
+
+/// A set of counters that measure thread info, such as total time, user time, sys time.
+class RuntimeProfile::ThreadCounters {
+ private:
+  friend class ThreadCounterMeasurement;
+  friend class RuntimeProfile;
+
+  Counter* total_time_; // total wall clock time
+  Counter* user_time_;  // user CPU time
+  Counter* sys_time_;   // system CPU time
+
+  /// The number of times a context switch resulted due to a process voluntarily giving
+  /// up the processor before its time slice was completed.
+  Counter* voluntary_context_switches_;
+
+  /// The number of times a context switch resulted due to a higher priority process
+  /// becoming runnable or because the current process exceeded its time slice.
+  Counter* involuntary_context_switches_;
+};
+
+/// An EventSequence captures a sequence of events (each added by
+/// calling MarkEvent). Each event has a text label, and a time
+/// (measured relative to the moment Start() was called as t=0). It is
+/// useful for tracking the evolution of some serial process, such as
+/// the query lifecycle.
+class RuntimeProfile::EventSequence {
+ public:
+  EventSequence() { }
+
+  /// Helper constructor for building from Thrift
+  EventSequence(const std::vector<int64_t>& timestamps,
+                const std::vector<std::string>& labels) {
+    DCHECK(timestamps.size() == labels.size());
+    for (int i = 0; i < timestamps.size(); ++i) {
+      events_.push_back(make_pair(labels[i], timestamps[i]));
+    }
+  }
+
+  /// Starts the timer without resetting it.
+  void Start() { sw_.Start(); }
+
+  /// Stores an event in sequence with the given label and the current time
+  /// (relative to the first time Start() was called) as the timestamp.
+  void MarkEvent(const std::string& label) {
+    Event event = make_pair(label, sw_.ElapsedTime());
+    boost::lock_guard<SpinLock> event_lock(lock_);
+    events_.push_back(event);
+  }
+
+  int64_t ElapsedTime() { return sw_.ElapsedTime(); }
+
+  /// An Event is a <label, timestamp> pair.
+  typedef std::pair<std::string, int64_t> Event;
+
+  /// An EventList is a sequence of Events, in increasing timestamp order.
+  typedef std::vector<Event> EventList;
+
+  /// Copies the member events_ into the supplied vector 'events'.
+  /// The supplied vector 'events' is cleared before this.
+  void GetEvents(std::vector<Event>* events) {
+    events->clear();
+    boost::lock_guard<SpinLock> event_lock(lock_);
+    events->insert(events->end(), events_.begin(), events_.end());
+  }
+
+  void ToThrift(TEventSequence* seq) const;
+
+ private:
+  /// Protect access to events_.
+  SpinLock lock_;
+
+  /// Stored in increasing time order.
+  EventList events_;
+
+  /// Timer which allows events to be timestamped when they are recorded.
+  MonotonicStopWatch sw_;
+};
+
+typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
+class RuntimeProfile::TimeSeriesCounter {
+ public:
+  std::string DebugString() const;
+
+  void AddSample(int ms_elapsed) {
+    int64_t sample = sample_fn_();
+    samples_.AddSample(sample, ms_elapsed);
+  }
+
+ private:
+  friend class RuntimeProfile;
+
+  TimeSeriesCounter(const std::string& name, TUnit::type unit,
+      DerivedCounterFunction fn)
+    : name_(name), unit_(unit), sample_fn_(fn) {
+  }
+
+  /// Construct a time series object from existing sample data. This counter
+  /// is then read-only (i.e. there is no sample function).
+  TimeSeriesCounter(const std::string& name, TUnit::type unit, int period,
+      const std::vector<int64_t>& values)
+    : name_(name), unit_(unit), sample_fn_(NULL), samples_(period, values) {
+  }
+
+  void ToThrift(TTimeSeriesCounter* counter);
+
+  std::string name_;
+  TUnit::type unit_;
+  DerivedCounterFunction sample_fn_;
+  StreamingCounterSampler samples_;
+};
+
+/// Counter whose value comes from an internal ConcurrentStopWatch to track multiple threads
+/// concurrent running time.
+class RuntimeProfile::ConcurrentTimerCounter : public Counter {
+ public:
+  ConcurrentTimerCounter(TUnit::type unit) : Counter(unit) {}
+
+  virtual int64_t value() const { return csw_.TotalRunningTime(); }
+
+  void Start() { csw_.Start(); }
+
+  void Stop() { csw_.Stop(); }
+
+  /// Returns lap time for caller who wants delta update of concurrent running time.
+  uint64_t LapTime() { return csw_.LapTime(); }
+
+  /// The value for this counter should come from internal ConcurrentStopWatch.
+  /// Set() and Add() should not be used.
+  virtual void Set(double value) {
+    DCHECK(false);
+  }
+
+  virtual void Set(int64_t value) {
+    DCHECK(false);
+  }
+
+  virtual void Set(int value) {
+    DCHECK(false);
+  }
+
+  virtual void Add(int64_t delta) {
+    DCHECK(false);
+  }
+
+ private:
+  ConcurrentStopWatch csw_;
+};
+
+/// Utility class to mark an event when the object is destroyed.
+class ScopedEvent {
+ public:
+  ScopedEvent(RuntimeProfile::EventSequence* event_sequence, const std::string& label)
+    : label_(label),
+      event_sequence_(event_sequence) {
+  }
+
+  /// Mark the event when the object is destroyed
+  ~ScopedEvent() {
+    event_sequence_->MarkEvent(label_);
+  }
+
+ private:
+  /// Disable copy constructor and assignment
+  ScopedEvent(const ScopedEvent& event);
+  ScopedEvent& operator=(const ScopedEvent& event);
+
+  const std::string label_;
+  RuntimeProfile::EventSequence* event_sequence_;
+};
+
+/// Utility class to update time elapsed when the object goes out of scope.
+/// 'T' must implement the StopWatch "interface" (Start,Stop,ElapsedTime) but
+/// we use templates not to pay for virtual function overhead. In some cases
+/// the runtime profile may be deleted while the counter is still active. In this
+/// case the is_cancelled argument can be provided so that ScopedTimer will not
+/// update the counter when the query is cancelled. The destructor for ScopedTimer
+/// can access both is_cancelled and the counter, so the caller must ensure that it
+/// is safe to access both at the end of the scope in which the timer is used.
+template<class T>
+class ScopedTimer {
+ public:
+  ScopedTimer(RuntimeProfile::Counter* counter, const bool* is_cancelled = NULL) :
+    counter_(counter), is_cancelled_(is_cancelled){
+    if (counter == NULL) return;
+    DCHECK(counter->unit() == TUnit::TIME_NS);
+    sw_.Start();
+  }
+
+  void Stop() { sw_.Stop(); }
+  void Start() { sw_.Start(); }
+
+  void UpdateCounter() {
+    if (counter_ != NULL && !IsCancelled()) {
+      counter_->Add(sw_.ElapsedTime());
+    }
+  }
+
+  /// Updates the underlying counter for the final time and clears the pointer to it.
+  void ReleaseCounter() {
+    UpdateCounter();
+    counter_ = NULL;
+  }
+
+  bool IsCancelled() {
+    return is_cancelled_ != NULL && *is_cancelled_;
+  }
+
+  /// Update counter when object is destroyed
+  ~ScopedTimer() {
+    sw_.Stop();
+    UpdateCounter();
+  }
+
+ private:
+  /// Disable copy constructor and assignment
+  ScopedTimer(const ScopedTimer& timer);
+  ScopedTimer& operator=(const ScopedTimer& timer);
+
+  T sw_;
+  RuntimeProfile::Counter* counter_;
+  const bool* is_cancelled_;
+};
+
+
+#ifdef __APPLE__
+// On OS X rusage via thread is not supported. In addition, the majority of the fields of
+// the usage structs will be zeroed out. Since Apple is not going to be a major plaform
+// initially it will most likely be enough to capture only time.
+// C.f. http://blog.kuriositaet.de/?p=257
+#define RUSAGE_THREAD RUSAGE_SELF
+#endif
+
+/// Utility class to update ThreadCounter when the object goes out of scope or when Stop is
+/// called. Threads measurements will then be taken using getrusage.
+/// This is ~5x slower than ScopedTimer due to calling getrusage.
+class ThreadCounterMeasurement {
+ public:
+  ThreadCounterMeasurement(RuntimeProfile::ThreadCounters* counters) :
+    stop_(false), counters_(counters) {
+    DCHECK(counters != NULL);
+    sw_.Start();
+    int ret = getrusage(RUSAGE_THREAD, &usage_base_);
+    DCHECK_EQ(ret, 0);
+  }
+
+  /// Stop and update the counter
+  void Stop() {
+    if (stop_) return;
+    stop_ = true;
+    sw_.Stop();
+    rusage usage;
+    int ret = getrusage(RUSAGE_THREAD, &usage);
+    DCHECK_EQ(ret, 0);
+    int64_t utime_diff =
+        (usage.ru_utime.tv_sec - usage_base_.ru_utime.tv_sec) * 1000L * 1000L * 1000L +
+        (usage.ru_utime.tv_usec - usage_base_.ru_utime.tv_usec) * 1000L;
+    int64_t stime_diff =
+        (usage.ru_stime.tv_sec - usage_base_.ru_stime.tv_sec) * 1000L * 1000L * 1000L +
+        (usage.ru_stime.tv_usec - usage_base_.ru_stime.tv_usec) * 1000L;
+    counters_->total_time_->Add(sw_.ElapsedTime());
+    counters_->user_time_->Add(utime_diff);
+    counters_->sys_time_->Add(stime_diff);
+    counters_->voluntary_context_switches_->Add(usage.ru_nvcsw - usage_base_.ru_nvcsw);
+    counters_->involuntary_context_switches_->Add(
+        usage.ru_nivcsw - usage_base_.ru_nivcsw);
+  }
+
+  /// Update counter when object is destroyed
+  ~ThreadCounterMeasurement() {
+    Stop();
+  }
+
+ private:
+  /// Disable copy constructor and assignment
+  ThreadCounterMeasurement(const ThreadCounterMeasurement& timer);
+  ThreadCounterMeasurement& operator=(const ThreadCounterMeasurement& timer);
+
+  bool stop_;
+  rusage usage_base_;
+  MonotonicStopWatch sw_;
+  RuntimeProfile::ThreadCounters* counters_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index d98b54c..2c80082 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -20,7 +20,7 @@
 #include "common/object-pool.h"
 #include "util/cpu-info.h"
 #include "util/periodic-counter-updater.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 #include "util/streaming-sampler.h"
 #include "util/thread.h"
 #include "util/time.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index bc7447a..714180a 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
 
 #include <iomanip>
 #include <iostream>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 8030d7b..98a02ed 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -17,64 +17,16 @@
 #define IMPALA_UTIL_RUNTIME_PROFILE_H
 
 #include <boost/function.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/unordered_map.hpp>
+#include <boost/thread/lock_guard.hpp>
 #include <iostream>
-#include <sys/time.h>
-#include <sys/resource.h>
 
 #include "common/atomic.h"
-#include "common/logging.h"
-#include "common/object-pool.h"
-#include "util/stopwatch.h"
-#include "util/streaming-sampler.h"
+#include "util/spinlock.h"
+
 #include "gen-cpp/RuntimeProfile_types.h"
 
 namespace impala {
 
-/// Define macros for updating counters.  The macros make it very easy to disable
-/// all counters at compile time.  Set this to 0 to remove counters.  This is useful
-/// to do to make sure the counters aren't affecting the system.
-#define ENABLE_COUNTERS 1
-
-/// Some macro magic to generate unique ids using __COUNTER__
-#define CONCAT_IMPL(x, y) x##y
-#define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y)
-
-#if ENABLE_COUNTERS
-  #define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit)
-  #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \
-      (profile)->AddTimeSeriesCounter(name, src_counter)
-  #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
-  #define ADD_CHILD_TIMER(profile, name, parent) \
-      (profile)->AddCounter(name, TUnit::TIME_NS, parent)
-  #define SCOPED_TIMER(c) \
-      ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
-  #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \
-      ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled)
-  #define COUNTER_ADD(c, v) (c)->Add(v)
-  #define COUNTER_SET(c, v) (c)->Set(v)
-  #define ADD_THREAD_COUNTERS(profile, prefix) (profile)->AddThreadCounters(prefix)
-  #define SCOPED_THREAD_COUNTER_MEASUREMENT(c) \
-    ThreadCounterMeasurement \
-      MACRO_CONCAT(SCOPED_THREAD_COUNTER_MEASUREMENT, __COUNTER__)(c)
-  #define SCOPED_CONCURRENT_COUNTER(c) \
-    ScopedStopWatch<RuntimeProfile::ConcurrentTimerCounter> \
-      MACRO_CONCAT(SCOPED_CONCURRENT_COUNTER, __COUNTER__)(c)
-#else
-  #define ADD_COUNTER(profile, name, unit) NULL
-  #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) NULL
-  #define ADD_TIMER(profile, name) NULL
-  #define ADD_CHILD_TIMER(profile, name, parent) NULL
-  #define SCOPED_TIMER(c)
-  #define CANCEL_SAFE_SCOPED_TIMER(c)
-  #define COUNTER_ADD(c, v)
-  #define COUNTER_SET(c, v)
-  #define ADD_THREAD_COUNTERS(profile, prefix) NULL
-  #define SCOPED_THREAD_COUNTER_MEASUREMENT(c)
-  #define SCOPED_CONCURRENT_COUNTER(c)
-#endif
-
 class ObjectPool;
 
 /// Runtime profile is a group of profiling counters.  It supports adding named counters
@@ -133,285 +85,16 @@ class RuntimeProfile {
     TUnit::type unit_;
   };
 
-  /// A counter that keeps track of the highest value seen (reporting that
-  /// as value()) and the current value.
-  class HighWaterMarkCounter : public Counter {
-   public:
-    HighWaterMarkCounter(TUnit::type unit) : Counter(unit) {}
-
-    virtual void Add(int64_t delta) {
-      int64_t new_val = current_value_.Add(delta);
-      UpdateMax(new_val);
-    }
-
-    /// Tries to increase the current value by delta. If current_value() + delta
-    /// exceeds max, return false and current_value is not changed.
-    bool TryAdd(int64_t delta, int64_t max) {
-      while (true) {
-        int64_t old_val = current_value_.Load();
-        int64_t new_val = old_val + delta;
-        if (UNLIKELY(new_val > max)) return false;
-        if (LIKELY(current_value_.CompareAndSwap(old_val, new_val))) {
-          UpdateMax(new_val);
-          return true;
-        }
-      }
-    }
-
-    virtual void Set(int64_t v) {
-      current_value_.Store(v);
-      UpdateMax(v);
-    }
-
-    int64_t current_value() const { return current_value_.Load(); }
-
-   private:
-    /// Set 'value_' to 'v' if 'v' is larger than 'value_'. The entire operation is
-    /// atomic.
-    void UpdateMax(int64_t v) {
-      while (true) {
-        int64_t old_max = value_.Load();
-        int64_t new_max = std::max(old_max, v);
-        if (new_max == old_max) break; // Avoid atomic update.
-        if (LIKELY(value_.CompareAndSwap(old_max, new_max))) break;
-      }
-    }
-
-    /// The current value of the counter. value_ in the super class represents
-    /// the high water mark.
-    AtomicInt64 current_value_;
-  };
+  class AveragedCounter;
+  class ConcurrentTimerCounter;
+  class DerivedCounter;
+  class EventSequence;
+  class HighWaterMarkCounter;
+  class ThreadCounters;
+  class TimeSeriesCounter;
 
   typedef boost::function<int64_t ()> DerivedCounterFunction;
 
-  /// A DerivedCounter also has a name and unit, but the value is computed.
-  /// Do not call Set() and Add().
-  class DerivedCounter : public Counter {
-   public:
-    DerivedCounter(TUnit::type unit, const DerivedCounterFunction& counter_fn)
-      : Counter(unit),
-        counter_fn_(counter_fn) {}
-
-    virtual int64_t value() const {
-      return counter_fn_();
-    }
-
-   private:
-    DerivedCounterFunction counter_fn_;
-  };
-
-  /// An AveragedCounter maintains a set of counters and its value is the
-  /// average of the values in that set. The average is updated through calls
-  /// to UpdateCounter(), which may add a new counter or update an existing counter.
-  /// Set() and Add() should not be called.
-  class AveragedCounter : public Counter {
-   public:
-    AveragedCounter(TUnit::type unit)
-     : Counter(unit),
-       current_double_sum_(0.0),
-       current_int_sum_(0) {
-    }
-
-    /// Update counter_value_map_ with the new counter. This may require the counter
-    /// to be added to the map.
-    /// No locks are obtained within this class because UpdateCounter() is called from
-    /// UpdateAverage(), which obtains locks on the entire counter map in a profile.
-    void UpdateCounter(Counter* new_counter) {
-      DCHECK_EQ(new_counter->unit_, unit_);
-      boost::unordered_map<Counter*, int64_t>::iterator it =
-          counter_value_map_.find(new_counter);
-      int64_t old_val = 0;
-      if (it != counter_value_map_.end()) {
-        old_val = it->second;
-        it->second = new_counter->value();
-      } else {
-        counter_value_map_[new_counter] = new_counter->value();
-      }
-
-      if (unit_ == TUnit::DOUBLE_VALUE) {
-        double old_double_val = *reinterpret_cast<double*>(&old_val);
-        current_double_sum_ += (new_counter->double_value() - old_double_val);
-        double result_val = current_double_sum_ / (double) counter_value_map_.size();
-        value_.Store(*reinterpret_cast<int64_t*>(&result_val));
-      } else {
-        current_int_sum_ += (new_counter->value() - old_val);
-        value_.Store(current_int_sum_ / counter_value_map_.size());
-      }
-    }
-
-    /// The value for this counter should be updated through UpdateCounter().
-    /// Set() and Add() should not be used.
-    virtual void Set(double value) {
-      DCHECK(false);
-    }
-
-    virtual void Set(int64_t value) {
-      DCHECK(false);
-    }
-
-    virtual void Add(int64_t delta) {
-      DCHECK(false);
-    }
-
-   private:
-    /// Map from counters to their existing values. Modified via UpdateCounter().
-    boost::unordered_map<Counter*, int64_t> counter_value_map_;
-
-    /// Current sums of values from counter_value_map_. Only one of these is used,
-    /// depending on the unit of the counter. current_double_sum_ is used for
-    /// DOUBLE_VALUE, current_int_sum_ otherwise.
-    double current_double_sum_;
-    int64_t current_int_sum_;
-  };
-
-  /// A set of counters that measure thread info, such as total time, user time, sys time.
-  class ThreadCounters {
-   private:
-    friend class ThreadCounterMeasurement;
-    friend class RuntimeProfile;
-
-    Counter* total_time_; // total wall clock time
-    Counter* user_time_;  // user CPU time
-    Counter* sys_time_;   // system CPU time
-
-    /// The number of times a context switch resulted due to a process voluntarily giving
-    /// up the processor before its time slice was completed.
-    Counter* voluntary_context_switches_;
-
-    /// The number of times a context switch resulted due to a higher priority process
-    /// becoming runnable or because the current process exceeded its time slice.
-    Counter* involuntary_context_switches_;
-  };
-
-  /// An EventSequence captures a sequence of events (each added by
-  /// calling MarkEvent). Each event has a text label, and a time
-  /// (measured relative to the moment Start() was called as t=0). It is
-  /// useful for tracking the evolution of some serial process, such as
-  /// the query lifecycle.
-  class EventSequence {
-   public:
-    EventSequence() { }
-
-    /// Helper constructor for building from Thrift
-    EventSequence(const std::vector<int64_t>& timestamps,
-                  const std::vector<std::string>& labels) {
-      DCHECK(timestamps.size() == labels.size());
-      for (int i = 0; i < timestamps.size(); ++i) {
-        events_.push_back(make_pair(labels[i], timestamps[i]));
-      }
-    }
-
-    /// Starts the timer without resetting it.
-    void Start() { sw_.Start(); }
-
-    /// Stores an event in sequence with the given label and the current time
-    /// (relative to the first time Start() was called) as the timestamp.
-    void MarkEvent(const std::string& label) {
-      Event event = make_pair(label, sw_.ElapsedTime());
-      boost::lock_guard<SpinLock> event_lock(lock_);
-      events_.push_back(event);
-    }
-
-    int64_t ElapsedTime() { return sw_.ElapsedTime(); }
-
-    /// An Event is a <label, timestamp> pair.
-    typedef std::pair<std::string, int64_t> Event;
-
-    /// An EventList is a sequence of Events, in increasing timestamp order.
-    typedef std::vector<Event> EventList;
-
-    /// Copies the member events_ into the supplied vector 'events'.
-    /// The supplied vector 'events' is cleared before this.
-    void GetEvents(std::vector<Event>* events) {
-      events->clear();
-      boost::lock_guard<SpinLock> event_lock(lock_);
-      events->insert(events->end(), events_.begin(), events_.end());
-    }
-
-    void ToThrift(TEventSequence* seq) const;
-
-   private:
-    /// Protect access to events_.
-    SpinLock lock_;
-
-    /// Stored in increasing time order.
-    EventList events_;
-
-    /// Timer which allows events to be timestamped when they are recorded.
-    MonotonicStopWatch sw_;
-  };
-
-  typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
-  class TimeSeriesCounter {
-   public:
-    std::string DebugString() const;
-
-    void AddSample(int ms_elapsed) {
-      int64_t sample = sample_fn_();
-      samples_.AddSample(sample, ms_elapsed);
-    }
-
-   private:
-    friend class RuntimeProfile;
-
-    TimeSeriesCounter(const std::string& name, TUnit::type unit,
-        DerivedCounterFunction fn)
-      : name_(name), unit_(unit), sample_fn_(fn) {
-    }
-
-    /// Construct a time series object from existing sample data. This counter
-    /// is then read-only (i.e. there is no sample function).
-    TimeSeriesCounter(const std::string& name, TUnit::type unit, int period,
-        const std::vector<int64_t>& values)
-      : name_(name), unit_(unit), sample_fn_(NULL), samples_(period, values) {
-    }
-
-    void ToThrift(TTimeSeriesCounter* counter);
-
-    std::string name_;
-    TUnit::type unit_;
-    DerivedCounterFunction sample_fn_;
-    StreamingCounterSampler samples_;
-  };
-
-  /// Counter whose value comes from an internal ConcurrentStopWatch to track multiple threads
-  /// concurrent running time.
-  class ConcurrentTimerCounter : public Counter {
-   public:
-    ConcurrentTimerCounter(TUnit::type unit) : Counter(unit) {}
-
-    virtual int64_t value() const { return csw_.TotalRunningTime(); }
-
-    void Start() { csw_.Start(); }
-
-    void Stop() { csw_.Stop(); }
-
-    /// Returns lap time for caller who wants delta update of concurrent running time.
-    uint64_t LapTime() { return csw_.LapTime(); }
-
-    /// The value for this counter should come from internal ConcurrentStopWatch.
-    /// Set() and Add() should not be used.
-    virtual void Set(double value) {
-      DCHECK(false);
-    }
-
-    virtual void Set(int64_t value) {
-      DCHECK(false);
-    }
-
-    virtual void Set(int value) {
-      DCHECK(false);
-    }
-
-    virtual void Add(int64_t delta) {
-      DCHECK(false);
-    }
-
-   private:
-    ConcurrentStopWatch csw_;
-  };
-
-
   /// Create a runtime profile object with 'name'.  Counters and merged profile are
   /// allocated from pool.
   /// If is_averaged_profile is true, the counters in this profile will be derived
@@ -721,166 +404,6 @@ class RuntimeProfile {
       const ChildCounterMap& child_counter_map, std::ostream* s);
 };
 
-/// Utility class to mark an event when the object is destroyed.
-class ScopedEvent {
- public:
-  ScopedEvent(RuntimeProfile::EventSequence* event_sequence, const std::string& label)
-    : label_(label),
-      event_sequence_(event_sequence) {
-  }
-
-  /// Mark the event when the object is destroyed
-  ~ScopedEvent() {
-    event_sequence_->MarkEvent(label_);
-  }
-
- private:
-  /// Disable copy constructor and assignment
-  ScopedEvent(const ScopedEvent& event);
-  ScopedEvent& operator=(const ScopedEvent& event);
-
-  const std::string label_;
-  RuntimeProfile::EventSequence* event_sequence_;
-};
-
-/// Utility class to update the counter at object construction and destruction.
-/// When the object is constructed, decrement the counter by val.
-/// When the object goes out of scope, increment the counter by val.
-class ScopedCounter {
- public:
-  ScopedCounter(RuntimeProfile::Counter* counter, int64_t val) :
-    val_(val),
-    counter_(counter) {
-    if (counter == NULL) return;
-    counter_->Add(-1L * val_);
-  }
-
-  /// Increment the counter when object is destroyed
-  ~ScopedCounter() {
-    if (counter_ != NULL) counter_->Add(val_);
-  }
-
- private:
-  /// Disable copy constructor and assignment
-  ScopedCounter(const ScopedCounter& counter);
-  ScopedCounter& operator=(const ScopedCounter& counter);
-
-  int64_t val_;
-  RuntimeProfile::Counter* counter_;
-};
-
-/// Utility class to update time elapsed when the object goes out of scope.
-/// 'T' must implement the StopWatch "interface" (Start,Stop,ElapsedTime) but
-/// we use templates not to pay for virtual function overhead. In some cases
-/// the runtime profile may be deleted while the counter is still active. In this
-/// case the is_cancelled argument can be provided so that ScopedTimer will not
-/// update the counter when the query is cancelled. The destructor for ScopedTimer
-/// can access both is_cancelled and the counter, so the caller must ensure that it
-/// is safe to access both at the end of the scope in which the timer is used.
-template<class T>
-class ScopedTimer {
- public:
-  ScopedTimer(RuntimeProfile::Counter* counter, const bool* is_cancelled = NULL) :
-    counter_(counter), is_cancelled_(is_cancelled){
-    if (counter == NULL) return;
-    DCHECK(counter->unit() == TUnit::TIME_NS);
-    sw_.Start();
-  }
-
-  void Stop() { sw_.Stop(); }
-  void Start() { sw_.Start(); }
-
-  void UpdateCounter() {
-    if (counter_ != NULL && !IsCancelled()) {
-      counter_->Add(sw_.ElapsedTime());
-    }
-  }
-
-  /// Updates the underlying counter for the final time and clears the pointer to it.
-  void ReleaseCounter() {
-    UpdateCounter();
-    counter_ = NULL;
-  }
-
-  bool IsCancelled() {
-    return is_cancelled_ != NULL && *is_cancelled_;
-  }
-
-  /// Update counter when object is destroyed
-  ~ScopedTimer() {
-    sw_.Stop();
-    UpdateCounter();
-  }
-
- private:
-  /// Disable copy constructor and assignment
-  ScopedTimer(const ScopedTimer& timer);
-  ScopedTimer& operator=(const ScopedTimer& timer);
-
-  T sw_;
-  RuntimeProfile::Counter* counter_;
-  const bool* is_cancelled_;
-};
-
-#ifdef __APPLE__
-// On OS X rusage via thread is not supported. In addition, the majority of the fields of
-// the usage structs will be zeroed out. Since Apple is not going to be a major plaform
-// initially it will most likely be enough to capture only time.
-// C.f. http://blog.kuriositaet.de/?p=257
-#define RUSAGE_THREAD RUSAGE_SELF
-#endif
-
-/// Utility class to update ThreadCounter when the object goes out of scope or when Stop is
-/// called. Threads measurements will then be taken using getrusage.
-/// This is ~5x slower than ScopedTimer due to calling getrusage.
-class ThreadCounterMeasurement {
- public:
-  ThreadCounterMeasurement(RuntimeProfile::ThreadCounters* counters) :
-    stop_(false), counters_(counters) {
-    DCHECK(counters != NULL);
-    sw_.Start();
-    int ret = getrusage(RUSAGE_THREAD, &usage_base_);
-    DCHECK_EQ(ret, 0);
-  }
-
-  /// Stop and update the counter
-  void Stop() {
-    if (stop_) return;
-    stop_ = true;
-    sw_.Stop();
-    rusage usage;
-    int ret = getrusage(RUSAGE_THREAD, &usage);
-    DCHECK_EQ(ret, 0);
-    int64_t utime_diff =
-        (usage.ru_utime.tv_sec - usage_base_.ru_utime.tv_sec) * 1000L * 1000L * 1000L +
-        (usage.ru_utime.tv_usec - usage_base_.ru_utime.tv_usec) * 1000L;
-    int64_t stime_diff =
-        (usage.ru_stime.tv_sec - usage_base_.ru_stime.tv_sec) * 1000L * 1000L * 1000L +
-        (usage.ru_stime.tv_usec - usage_base_.ru_stime.tv_usec) * 1000L;
-    counters_->total_time_->Add(sw_.ElapsedTime());
-    counters_->user_time_->Add(utime_diff);
-    counters_->sys_time_->Add(stime_diff);
-    counters_->voluntary_context_switches_->Add(usage.ru_nvcsw - usage_base_.ru_nvcsw);
-    counters_->involuntary_context_switches_->Add(
-        usage.ru_nivcsw - usage_base_.ru_nivcsw);
-  }
-
-  /// Update counter when object is destroyed
-  ~ThreadCounterMeasurement() {
-    Stop();
-  }
-
- private:
-  /// Disable copy constructor and assignment
-  ThreadCounterMeasurement(const ThreadCounterMeasurement& timer);
-  ThreadCounterMeasurement& operator=(const ThreadCounterMeasurement& timer);
-
-  bool stop_;
-  rusage usage_base_;
-  MonotonicStopWatch sw_;
-  RuntimeProfile::ThreadCounters* counters_;
-};
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/simple-logger.cc
----------------------------------------------------------------------
diff --git a/be/src/util/simple-logger.cc b/be/src/util/simple-logger.cc
index 6fca3fa..87ef0f5 100644
--- a/be/src/util/simple-logger.cc
+++ b/be/src/util/simple-logger.cc
@@ -18,6 +18,7 @@
 #include <boost/date_time/posix_time/posix_time_types.hpp>
 #include <boost/filesystem.hpp>
 #include <gutil/strings/substitute.h>
+#include <boost/thread/lock_guard.hpp>
 
 #include "common/names.h"
 #include "util/logging-support.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/simple-logger.h
----------------------------------------------------------------------
diff --git a/be/src/util/simple-logger.h b/be/src/util/simple-logger.h
index 66031fd..af17d26 100644
--- a/be/src/util/simple-logger.h
+++ b/be/src/util/simple-logger.h
@@ -15,8 +15,8 @@
 #ifndef IMPALA_SERVICE_SIMPLE_LOGGER_H
 #define IMPALA_SERVICE_SIMPLE_LOGGER_H
 
-#include <boost/thread/thread.hpp>
 #include <fstream>
+#include <boost/thread/mutex.hpp>
 
 #include "common/status.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/streaming-sampler.h
----------------------------------------------------------------------
diff --git a/be/src/util/streaming-sampler.h b/be/src/util/streaming-sampler.h
index 9cf4151..71e7548 100644
--- a/be/src/util/streaming-sampler.h
+++ b/be/src/util/streaming-sampler.h
@@ -17,6 +17,8 @@
 
 #include <string.h>
 #include <iostream>
+#include <boost/thread/lock_guard.hpp>
+
 #include "util/spinlock.h"
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/tuple-row-compare.cc
----------------------------------------------------------------------
diff --git a/be/src/util/tuple-row-compare.cc b/be/src/util/tuple-row-compare.cc
index a0bbf97..18ae53d 100644
--- a/be/src/util/tuple-row-compare.cc
+++ b/be/src/util/tuple-row-compare.cc
@@ -19,6 +19,7 @@
 #include "codegen/codegen-anyval.h"
 #include "codegen/llvm-codegen.h"
 #include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
 
 using namespace impala;
 using namespace llvm;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/tuple-row-compare.h
----------------------------------------------------------------------
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index 8505f19..d20d864 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -19,10 +19,11 @@
 #include "exec/sort-exec-exprs.h"
 #include "exprs/expr.h"
 #include "exprs/expr-context.h"
+#include "runtime/descriptors.h"
+#include "runtime/raw-value.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/tuple.h"
 #include "runtime/tuple-row.h"
-#include "runtime/descriptors.h"
 
 namespace impala {