You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/26 02:41:52 UTC
[2/4] incubator-impala git commit: Refactor RuntimeState and ExecEnv
dependencies
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 8c423c0..c970163 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+#include "runtime/runtime-state.h"
+
#include <iostream>
#include <jni.h>
#include <sstream>
@@ -27,11 +29,11 @@
#include "exprs/expr.h"
#include "runtime/buffered-block-mgr.h"
#include "runtime/descriptors.h"
-#include "runtime/runtime-state.h"
-#include "runtime/timestamp-value.h"
#include "runtime/data-stream-mgr.h"
#include "runtime/data-stream-recvr.h"
-#include "runtime/runtime-filter.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/runtime-filter-bank.h"
+#include "runtime/timestamp-value.h"
#include "util/bitmap.h"
#include "util/cpu-info.h"
#include "util/debug-util.h"
@@ -75,7 +77,7 @@ RuntimeState::RuntimeState(const TExecPlanFragmentParams& fragment_params,
is_cancelled_(false),
query_resource_mgr_(NULL),
root_node_id_(-1),
- filter_bank_(fragment_ctx().query_ctx, this) {
+ filter_bank_(new RuntimeFilterBank(fragment_ctx().query_ctx, this)) {
Status status = Init(exec_env);
DCHECK(status.ok()) << status.GetDetail();
}
@@ -90,7 +92,7 @@ RuntimeState::RuntimeState(const TQueryCtx& query_ctx)
is_cancelled_(false),
query_resource_mgr_(NULL),
root_node_id_(-1),
- filter_bank_(query_ctx, this) {
+ filter_bank_(new RuntimeFilterBank(query_ctx, this)) {
fragment_params_.fragment_instance_ctx.__set_query_ctx(query_ctx);
fragment_params_.fragment_instance_ctx.query_ctx.request.query_options
.__set_batch_size(DEFAULT_BATCH_SIZE);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 860692f..15c8d9c 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -16,40 +16,34 @@
#ifndef IMPALA_RUNTIME_RUNTIME_STATE_H
#define IMPALA_RUNTIME_RUNTIME_STATE_H
-/// needed for scoped_ptr to work on ObjectPool
-#include "common/object-pool.h"
-
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <vector>
#include <string>
-/// stringstream is a typedef, so can't forward declare it.
-#include <sstream>
-#include "scheduling/query-resource-mgr.h"
+// NOTE: try not to add more headers here: runtime-state.h is included in many many files.
+#include "common/global-types.h" // for PlanNodeId
+#include "runtime/client-cache-types.h"
#include "runtime/exec-env.h"
-#include "runtime/descriptors.h" // for PlanNodeId
-#include "runtime/disk-io-mgr.h" // for DiskIoMgr::RequestContext
-#include "runtime/mem-tracker.h"
-#include "runtime/runtime-filter.h"
#include "runtime/thread-resource-mgr.h"
-#include "gen-cpp/PlanNodes_types.h"
-#include "gen-cpp/Types_types.h" // for TUniqueId
-#include "gen-cpp/ImpalaInternalService_types.h" // for TQueryOptions
-#include "util/auth-util.h"
+#include "util/auth-util.h" // for GetEffectiveUser()
#include "util/runtime-profile.h"
namespace impala {
class BufferedBlockMgr;
+class DataStreamRecvr;
class DescriptorTbl;
-class ObjectPool;
-class Status;
-class ExecEnv;
+class DiskIoRequestContext;
class Expr;
class LlvmCodeGen;
+class MemTracker;
+class ObjectPool;
+class RuntimeFilterBank;
+class Status;
class TimestampValue;
-class DataStreamRecvr;
+class TQueryOptions;
+class TUniqueId;
/// Counts how many rows an INSERT query has added to a particular partition
/// (partitions are identified by their partition keys: k1=v1/k2=v2
@@ -140,7 +134,7 @@ class RuntimeState {
ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; }
- std::vector<DiskIoMgr::RequestContext*>* reader_contexts() { return &reader_contexts_; }
+ std::vector<DiskIoRequestContext*>* reader_contexts() { return &reader_contexts_; }
void set_fragment_root_id(PlanNodeId id) {
DCHECK_EQ(root_node_id_, -1) << "Should not set this twice.";
@@ -151,7 +145,7 @@ class RuntimeState {
/// See comment on root_node_id_. We add one to prevent having a hash seed of 0.
uint32_t fragment_hash_seed() const { return root_node_id_ + 1; }
- RuntimeFilterBank* filter_bank() { return &filter_bank_; }
+ RuntimeFilterBank* filter_bank() { return filter_bank_.get(); }
PartitionStatusMap* per_partition_status() { return &per_partition_status_; }
@@ -348,7 +342,7 @@ class RuntimeState {
QueryResourceMgr* query_resource_mgr_;
/// Reader contexts that need to be closed when the fragment is closed.
- std::vector<DiskIoMgr::RequestContext*> reader_contexts_;
+ std::vector<DiskIoRequestContext*> reader_contexts_;
/// BufferedBlockMgr object used to allocate and manage blocks of input data in memory
/// with a fixed memory budget.
@@ -366,7 +360,7 @@ class RuntimeState {
/// Manages runtime filters that are either produced or consumed (or both!) by plan
/// nodes that share this runtime state.
- RuntimeFilterBank filter_bank_;
+ boost::scoped_ptr<RuntimeFilterBank> filter_bank_;
/// prohibit copies
RuntimeState(const RuntimeState&);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc
index 0376437..328d1c2 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -18,7 +18,7 @@
#include "runtime/row-batch.h"
#include "runtime/sorter.h"
#include "runtime/tuple-row.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
#include "common/names.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index e0d388a..299dd2f 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -22,7 +22,7 @@
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/sorted-run-merger.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
#include "common/names.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/string-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/string-buffer.h b/be/src/runtime/string-buffer.h
index c8f79df..ec59806 100644
--- a/be/src/runtime/string-buffer.h
+++ b/be/src/runtime/string-buffer.h
@@ -18,6 +18,7 @@
#include "common/status.h"
#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
#include "runtime/string-value.h"
using namespace strings;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/thread-resource-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr.h b/be/src/runtime/thread-resource-mgr.h
index cb2c13e..00aaeba 100644
--- a/be/src/runtime/thread-resource-mgr.h
+++ b/be/src/runtime/thread-resource-mgr.h
@@ -18,10 +18,7 @@
#include <stdlib.h>
#include <boost/function.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
-#include <boost/thread/thread.hpp>
#include <list>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/tuple.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.cc b/be/src/runtime/tuple.cc
index fe85948..93e63f3 100644
--- a/be/src/runtime/tuple.cc
+++ b/be/src/runtime/tuple.cc
@@ -29,6 +29,7 @@
#include "runtime/string-value.h"
#include "runtime/tuple-row.h"
#include "util/debug-util.h"
+#include "util/runtime-profile-counters.h"
#include "common/names.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 70174a1..94e21b8 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -25,7 +25,7 @@
#include "runtime/mem-tracker.h"
#include "util/debug-util.h"
#include "util/time.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
#include "util/pretty-printer.h"
#include "common/names.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/scheduling/query-resource-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-resource-mgr.cc b/be/src/scheduling/query-resource-mgr.cc
index 5687269..bf42329 100644
--- a/be/src/scheduling/query-resource-mgr.cc
+++ b/be/src/scheduling/query-resource-mgr.cc
@@ -22,6 +22,7 @@
#include "runtime/exec-env.h"
#include "resourcebroker/resource-broker.h"
#include "util/bit-util.h"
+#include "util/cgroups-mgr.h"
#include "util/container-util.h"
#include "util/network-util.h"
#include "util/promise.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index 7abc916..a14509c 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -24,6 +24,7 @@
#include "common/logging.h"
#include "util/metrics.h"
+#include "resourcebroker/resource-broker.h"
#include "runtime/exec-env.h"
#include "runtime/coordinator.h"
#include "service/impala-server.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/fragment-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc
index 680e295..38866f1 100644
--- a/be/src/service/fragment-exec-state.cc
+++ b/be/src/service/fragment-exec-state.cc
@@ -20,6 +20,7 @@
#include "gen-cpp/ImpalaInternalService.h"
#include "rpc/thrift-util.h"
#include "gutil/strings/substitute.h"
+#include "runtime/runtime-filter-bank.h"
#include "util/bloom-filter.h"
#include "runtime/backend-client.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/fragment-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc
index 5210f8c..f64ca7e 100644
--- a/be/src/service/fragment-mgr.cc
+++ b/be/src/service/fragment-mgr.cc
@@ -20,6 +20,7 @@
#include "service/fragment-exec-state.h"
#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
#include "util/impalad-metrics.h"
#include "util/uid-util.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 478c1ea..35cb945 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -41,6 +41,7 @@
#include "runtime/plan-fragment-executor.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
#include "runtime/raw-value.inline.h"
#include "runtime/timestamp-value.h"
#include "scheduling/simple-scheduler.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 1dc4982..ce5df38 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -31,11 +31,12 @@
#include "common/logging.h"
#include "common/version.h"
#include "exprs/expr.h"
+#include "rpc/thrift-util.h"
#include "runtime/raw-value.h"
#include "service/query-exec-state.h"
#include "service/query-options.h"
#include "util/debug-util.h"
-#include "rpc/thrift-util.h"
+#include "util/runtime-profile-counters.h"
#include "util/impalad-metrics.h"
#include "util/string-parser.h"
#include "service/hs2-util.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index b534902..ec70c47 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -18,6 +18,8 @@
#include "exprs/expr.h"
#include "exprs/expr-context.h"
+#include "resourcebroker/resource-broker.h"
+#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "service/impala-server.h"
@@ -25,6 +27,7 @@
#include "service/query-options.h"
#include "util/debug-util.h"
#include "util/impalad-metrics.h"
+#include "util/runtime-profile-counters.h"
#include "util/time.h"
#include "gen-cpp/CatalogService.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/statestore/statestore-subscriber.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 3808459..ddf663e 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -21,23 +21,22 @@
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
-#include <boost/thread/thread.hpp>
#include "statestore/statestore.h"
#include "util/stopwatch.h"
#include "rpc/thrift-util.h"
#include "rpc/thrift-client.h"
-#include "util/thread.h"
#include "util/metrics.h"
#include "gen-cpp/StatestoreService.h"
#include "gen-cpp/StatestoreSubscriber.h"
namespace impala {
-class TimeoutFailureDetector;
class Status;
-class TNetworkAddress;
+class TimeoutFailureDetector;
+class Thread;
class ThriftServer;
+class TNetworkAddress;
typedef ClientCache<StatestoreServiceClient> StatestoreClientCache;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/testutil/desc-tbl-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/desc-tbl-builder.cc b/be/src/testutil/desc-tbl-builder.cc
index 358ae54..3b3994a 100644
--- a/be/src/testutil/desc-tbl-builder.cc
+++ b/be/src/testutil/desc-tbl-builder.cc
@@ -15,7 +15,7 @@
#include "testutil/desc-tbl-builder.h"
#include "util/bit-util.h"
-
+#include "common/object-pool.h"
#include "runtime/descriptors.h"
#include "common/names.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/testutil/desc-tbl-builder.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/desc-tbl-builder.h b/be/src/testutil/desc-tbl-builder.h
index ec816ee..fdaf67e 100644
--- a/be/src/testutil/desc-tbl-builder.h
+++ b/be/src/testutil/desc-tbl-builder.h
@@ -16,6 +16,7 @@
#define IMPALA_TESTUTIL_ROW_DESC_BUILDER_H_
#include "runtime/runtime-state.h"
+#include "runtime/types.h"
namespace impala {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/auth-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/auth-util.cc b/be/src/util/auth-util.cc
index e75a250..afe037c 100644
--- a/be/src/util/auth-util.cc
+++ b/be/src/util/auth-util.cc
@@ -14,6 +14,8 @@
#include "util/auth-util.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+
using namespace std;
namespace impala {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/auth-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/auth-util.h b/be/src/util/auth-util.h
index e44bf70..d82c61a 100644
--- a/be/src/util/auth-util.h
+++ b/be/src/util/auth-util.h
@@ -18,16 +18,16 @@
#include <string>
-#include "gen-cpp/ImpalaInternalService_types.h"
-
namespace impala {
- /// Returns a reference to the "effective user" from the specified session. Queries
- /// are run and authorized on behalf of the effective user. When a delegated_user is
- /// specified (is not empty), the effective user is the delegated_user. This is because
- /// the connected_user is acting as a "proxy user" for the delegated_user. When
- /// delegated_user is empty, the effective user is the connected user.
- const std::string& GetEffectiveUser(const TSessionState& session);
+class TSessionState;
+
+/// Returns a reference to the "effective user" from the specified session. Queries
+/// are run and authorized on behalf of the effective user. When a delegated_user is
+/// specified (is not empty), the effective user is the delegated_user. This is because
+/// the connected_user is acting as a "proxy user" for the delegated_user. When
+/// delegated_user is empty, the effective user is the connected user.
+const std::string& GetEffectiveUser(const TSessionState& session);
} // namespace impala
#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/avro-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/avro-util.cc b/be/src/util/avro-util.cc
index b591f23..5cce8a3 100644
--- a/be/src/util/avro-util.cc
+++ b/be/src/util/avro-util.cc
@@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-#include <avro/schema.h>
-
#include "util/avro-util.h"
+#include <avro/schema.h>
+#include <sstream>
+
using namespace std;
namespace impala {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/codec.h
----------------------------------------------------------------------
diff --git a/be/src/util/codec.h b/be/src/util/codec.h
index afe3791..a337983 100644
--- a/be/src/util/codec.h
+++ b/be/src/util/codec.h
@@ -17,8 +17,6 @@
#define IMPALA_UTIL_CODEC_H
#include "common/status.h"
-#include "runtime/mem-pool.h"
-#include "util/runtime-profile.h"
#include <boost/scoped_ptr.hpp>
#include "gen-cpp/Descriptors_types.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/decompress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/decompress.cc b/be/src/util/decompress.cc
index 588aff6..8a2ea74 100644
--- a/be/src/util/decompress.cc
+++ b/be/src/util/decompress.cc
@@ -15,6 +15,7 @@
#include <boost/assign/list_of.hpp>
#include "util/decompress.h"
#include "exec/read-write-util.h"
+#include "runtime/mem-tracker.h"
#include "runtime/runtime-state.h"
#include "common/logging.h"
#include "gen-cpp/Descriptors_types.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index 7a48886..795a6b1 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -24,7 +24,6 @@
#include "runtime/mem-pool.h"
#include "runtime/string-value.h"
#include "util/rle-encoding.h"
-#include "util/runtime-profile.h"
namespace impala {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/hdfs-bulk-ops-defs.h
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-bulk-ops-defs.h b/be/src/util/hdfs-bulk-ops-defs.h
new file mode 100644
index 0000000..13e1be0
--- /dev/null
+++ b/be/src/util/hdfs-bulk-ops-defs.h
@@ -0,0 +1,31 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_UTIL_HDFS_BULK_OPS_DEFS_H
+#define IMPALA_UTIL_HDFS_BULK_OPS_DEFS_H
+
+namespace impala {
+
+/// Forward declarations for HDFS ops.
+template <typename T>
+class ThreadPool;
+
+class HdfsOp;
+class HdfsOperationSet;
+
+typedef ThreadPool<HdfsOp> HdfsOpThreadPool;
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/hdfs-bulk-ops.h
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-bulk-ops.h b/be/src/util/hdfs-bulk-ops.h
index e66f0d1..97feaa0 100644
--- a/be/src/util/hdfs-bulk-ops.h
+++ b/be/src/util/hdfs-bulk-ops.h
@@ -21,6 +21,7 @@
#include "common/hdfs.h"
#include "common/atomic.h"
#include "common/status.h"
+#include "util/hdfs-bulk-ops-defs.h"
#include "util/thread-pool.h"
#include "util/counting-barrier.h"
#include "runtime/hdfs-fs-cache.h"
@@ -36,8 +37,6 @@ enum HdfsOpType {
CHMOD
};
-class HdfsOperationSet;
-
/// Container class that encapsulates a single HDFS operation. Used only internally by
/// HdfsOperationSet, but visible because it parameterises HdfsOpThreadPool.
class HdfsOp {
@@ -81,8 +80,6 @@ class HdfsOp {
void AddError(const string& error_msg) const;
};
-typedef ThreadPool<HdfsOp> HdfsOpThreadPool;
-
/// Creates a new HdfsOp-processing thread pool.
HdfsOpThreadPool* CreateHdfsOpThreadPool(const std::string& name, uint32_t num_threads,
uint32_t max_queue_length);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index ab4b6c2..9ddaa48 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -24,8 +24,8 @@
#include <boost/thread/locks.hpp>
#include "common/logging.h"
-#include "common/status.h"
#include "common/object-pool.h"
+#include "common/status.h"
#include "util/debug-util.h"
#include "util/json-util.h"
#include "util/pretty-printer.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/periodic-counter-updater.cc
----------------------------------------------------------------------
diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc
index a3979d8..d1cbac5 100644
--- a/be/src/util/periodic-counter-updater.cc
+++ b/be/src/util/periodic-counter-updater.cc
@@ -14,6 +14,7 @@
#include "util/periodic-counter-updater.h"
+#include "util/runtime-profile-counters.h"
#include "util/time.h"
#include "common/names.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile-counters.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
new file mode 100644
index 0000000..4b51c4f
--- /dev/null
+++ b/be/src/util/runtime-profile-counters.h
@@ -0,0 +1,488 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
+#ifndef IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
+#define IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/unordered_map.hpp>
+#include <sys/time.h>
+#include <sys/resource.h>
+
+#include "common/atomic.h"
+#include "common/logging.h"
+#include "util/runtime-profile.h"
+#include "util/stopwatch.h"
+#include "util/streaming-sampler.h"
+
+namespace impala {
+
+/// Define macros for updating counters. The macros make it very easy to disable
+/// all counters at compile time. Set this to 0 to remove counters. This is useful
+/// to do to make sure the counters aren't affecting the system.
+#define ENABLE_COUNTERS 1
+
+/// Some macro magic to generate unique ids using __COUNTER__
+#define CONCAT_IMPL(x, y) x##y
+#define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y)
+
+#if ENABLE_COUNTERS
+ #define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit)
+ #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \
+ (profile)->AddTimeSeriesCounter(name, src_counter)
+ #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
+ #define ADD_CHILD_TIMER(profile, name, parent) \
+ (profile)->AddCounter(name, TUnit::TIME_NS, parent)
+ #define SCOPED_TIMER(c) \
+ ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
+ #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \
+ ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled)
+ #define COUNTER_ADD(c, v) (c)->Add(v)
+ #define COUNTER_SET(c, v) (c)->Set(v)
+ #define ADD_THREAD_COUNTERS(profile, prefix) (profile)->AddThreadCounters(prefix)
+ #define SCOPED_THREAD_COUNTER_MEASUREMENT(c) \
+ ThreadCounterMeasurement \
+ MACRO_CONCAT(SCOPED_THREAD_COUNTER_MEASUREMENT, __COUNTER__)(c)
+ #define SCOPED_CONCURRENT_COUNTER(c) \
+ ScopedStopWatch<RuntimeProfile::ConcurrentTimerCounter> \
+ MACRO_CONCAT(SCOPED_CONCURRENT_COUNTER, __COUNTER__)(c)
+#else
+ #define ADD_COUNTER(profile, name, unit) NULL
+ #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) NULL
+ #define ADD_TIMER(profile, name) NULL
+ #define ADD_CHILD_TIMER(profile, name, parent) NULL
+ #define SCOPED_TIMER(c)
+ #define CANCEL_SAFE_SCOPED_TIMER(c)
+ #define COUNTER_ADD(c, v)
+ #define COUNTER_SET(c, v)
+ #define ADD_THREAD_COUNTERS(profile, prefix) NULL
+ #define SCOPED_THREAD_COUNTER_MEASUREMENT(c)
+ #define SCOPED_CONCURRENT_COUNTER(c)
+#endif
+
+/// A counter that keeps track of the highest value seen (reporting that
+/// as value()) and the current value.
+class RuntimeProfile::HighWaterMarkCounter : public RuntimeProfile::Counter {
+ public:
+ HighWaterMarkCounter(TUnit::type unit) : Counter(unit) {}
+
+ virtual void Add(int64_t delta) {
+ int64_t new_val = current_value_.Add(delta);
+ UpdateMax(new_val);
+ }
+
+ /// Tries to increase the current value by delta. If current_value() + delta
+ /// exceeds max, return false and current_value is not changed.
+ bool TryAdd(int64_t delta, int64_t max) {
+ while (true) {
+ int64_t old_val = current_value_.Load();
+ int64_t new_val = old_val + delta;
+ if (UNLIKELY(new_val > max)) return false;
+ if (LIKELY(current_value_.CompareAndSwap(old_val, new_val))) {
+ UpdateMax(new_val);
+ return true;
+ }
+ }
+ }
+
+ virtual void Set(int64_t v) {
+ current_value_.Store(v);
+ UpdateMax(v);
+ }
+
+ int64_t current_value() const { return current_value_.Load(); }
+
+ private:
+ /// Set 'value_' to 'v' if 'v' is larger than 'value_'. The entire operation is
+ /// atomic.
+ void UpdateMax(int64_t v) {
+ while (true) {
+ int64_t old_max = value_.Load();
+ int64_t new_max = std::max(old_max, v);
+ if (new_max == old_max) break; // Avoid atomic update.
+ if (LIKELY(value_.CompareAndSwap(old_max, new_max))) break;
+ }
+ }
+
+ /// The current value of the counter. value_ in the super class represents
+ /// the high water mark.
+ AtomicInt64 current_value_;
+};
+
+/// A DerivedCounter also has a name and unit, but the value is computed.
+/// Do not call Set() and Add().
+class RuntimeProfile::DerivedCounter : public RuntimeProfile::Counter {
+ public:
+ DerivedCounter(TUnit::type unit, const DerivedCounterFunction& counter_fn)
+ : Counter(unit),
+ counter_fn_(counter_fn) {}
+
+ virtual int64_t value() const {
+ return counter_fn_();
+ }
+
+ private:
+ DerivedCounterFunction counter_fn_;
+};
+
+/// An AveragedCounter maintains a set of counters and its value is the
+/// average of the values in that set. The average is updated through calls
+/// to UpdateCounter(), which may add a new counter or update an existing counter.
+/// Set() and Add() should not be called.
+class RuntimeProfile::AveragedCounter : public RuntimeProfile::Counter {
+ public:
+ AveragedCounter(TUnit::type unit)
+ : Counter(unit),
+ current_double_sum_(0.0),
+ current_int_sum_(0) {
+ }
+
+ /// Update counter_value_map_ with the new counter. This may require the counter
+ /// to be added to the map.
+ /// No locks are obtained within this class because UpdateCounter() is called from
+ /// UpdateAverage(), which obtains locks on the entire counter map in a profile.
+ void UpdateCounter(Counter* new_counter) {
+ DCHECK_EQ(new_counter->unit_, unit_);
+ boost::unordered_map<Counter*, int64_t>::iterator it =
+ counter_value_map_.find(new_counter);
+ int64_t old_val = 0;
+ if (it != counter_value_map_.end()) {
+ old_val = it->second;
+ it->second = new_counter->value();
+ } else {
+ counter_value_map_[new_counter] = new_counter->value();
+ }
+
+ if (unit_ == TUnit::DOUBLE_VALUE) {
+ double old_double_val = *reinterpret_cast<double*>(&old_val);
+ current_double_sum_ += (new_counter->double_value() - old_double_val);
+ double result_val = current_double_sum_ / (double) counter_value_map_.size();
+ value_.Store(*reinterpret_cast<int64_t*>(&result_val));
+ } else {
+ current_int_sum_ += (new_counter->value() - old_val);
+ value_.Store(current_int_sum_ / counter_value_map_.size());
+ }
+ }
+
+ /// The value for this counter should be updated through UpdateCounter().
+ /// Set() and Add() should not be used.
+ virtual void Set(double value) {
+ DCHECK(false);
+ }
+
+ virtual void Set(int64_t value) {
+ DCHECK(false);
+ }
+
+ virtual void Add(int64_t delta) {
+ DCHECK(false);
+ }
+
+ private:
+ /// Map from counters to their existing values. Modified via UpdateCounter().
+ boost::unordered_map<Counter*, int64_t> counter_value_map_;
+
+ /// Current sums of values from counter_value_map_. Only one of these is used,
+ /// depending on the unit of the counter. current_double_sum_ is used for
+ /// DOUBLE_VALUE, current_int_sum_ otherwise.
+ double current_double_sum_;
+ int64_t current_int_sum_;
+};
+
+/// A set of counters that measure thread info, such as total time, user time, sys time.
+class RuntimeProfile::ThreadCounters {
+ private:
+ friend class ThreadCounterMeasurement;
+ friend class RuntimeProfile;
+
+ Counter* total_time_; // total wall clock time
+ Counter* user_time_; // user CPU time
+ Counter* sys_time_; // system CPU time
+
+ /// The number of times a context switch resulted due to a process voluntarily giving
+ /// up the processor before its time slice was completed.
+ Counter* voluntary_context_switches_;
+
+ /// The number of times a context switch resulted due to a higher priority process
+ /// becoming runnable or because the current process exceeded its time slice.
+ Counter* involuntary_context_switches_;
+};
+
+/// An EventSequence captures a sequence of events (each added by
+/// calling MarkEvent). Each event has a text label, and a time
+/// (measured relative to the moment Start() was called as t=0). It is
+/// useful for tracking the evolution of some serial process, such as
+/// the query lifecycle.
+class RuntimeProfile::EventSequence {
+ public:
+ EventSequence() { }
+
+ /// Helper constructor for building from Thrift
+ EventSequence(const std::vector<int64_t>& timestamps,
+ const std::vector<std::string>& labels) {
+ DCHECK(timestamps.size() == labels.size());
+ for (int i = 0; i < timestamps.size(); ++i) {
+ events_.push_back(make_pair(labels[i], timestamps[i]));
+ }
+ }
+
+ /// Starts the timer without resetting it.
+ void Start() { sw_.Start(); }
+
+ /// Stores an event in sequence with the given label and the current time
+ /// (relative to the first time Start() was called) as the timestamp.
+ void MarkEvent(const std::string& label) {
+ Event event = make_pair(label, sw_.ElapsedTime());
+ boost::lock_guard<SpinLock> event_lock(lock_);
+ events_.push_back(event);
+ }
+
+ int64_t ElapsedTime() { return sw_.ElapsedTime(); }
+
+ /// An Event is a <label, timestamp> pair.
+ typedef std::pair<std::string, int64_t> Event;
+
+ /// An EventList is a sequence of Events, in increasing timestamp order.
+ typedef std::vector<Event> EventList;
+
+ /// Copies the member events_ into the supplied vector 'events'.
+ /// The supplied vector 'events' is cleared before this.
+ void GetEvents(std::vector<Event>* events) {
+ events->clear();
+ boost::lock_guard<SpinLock> event_lock(lock_);
+ events->insert(events->end(), events_.begin(), events_.end());
+ }
+
+ void ToThrift(TEventSequence* seq) const;
+
+ private:
+ /// Protect access to events_.
+ SpinLock lock_;
+
+ /// Stored in increasing time order.
+ EventList events_;
+
+ /// Timer which allows events to be timestamped when they are recorded.
+ MonotonicStopWatch sw_;
+};
+
+typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
+class RuntimeProfile::TimeSeriesCounter {
+ public:
+ std::string DebugString() const;
+
+ void AddSample(int ms_elapsed) {
+ int64_t sample = sample_fn_();
+ samples_.AddSample(sample, ms_elapsed);
+ }
+
+ private:
+ friend class RuntimeProfile;
+
+ TimeSeriesCounter(const std::string& name, TUnit::type unit,
+ DerivedCounterFunction fn)
+ : name_(name), unit_(unit), sample_fn_(fn) {
+ }
+
+ /// Construct a time series object from existing sample data. This counter
+ /// is then read-only (i.e. there is no sample function).
+ TimeSeriesCounter(const std::string& name, TUnit::type unit, int period,
+ const std::vector<int64_t>& values)
+ : name_(name), unit_(unit), sample_fn_(NULL), samples_(period, values) {
+ }
+
+ void ToThrift(TTimeSeriesCounter* counter);
+
+ std::string name_;
+ TUnit::type unit_;
+ DerivedCounterFunction sample_fn_;
+ StreamingCounterSampler samples_;
+};
+
+/// Counter whose value comes from an internal ConcurrentStopWatch to track multiple threads
+/// concurrent running time.
+class RuntimeProfile::ConcurrentTimerCounter : public Counter {
+ public:
+ ConcurrentTimerCounter(TUnit::type unit) : Counter(unit) {}
+
+ virtual int64_t value() const { return csw_.TotalRunningTime(); }
+
+ void Start() { csw_.Start(); }
+
+ void Stop() { csw_.Stop(); }
+
+ /// Returns lap time for caller who wants delta update of concurrent running time.
+ uint64_t LapTime() { return csw_.LapTime(); }
+
+ /// The value for this counter should come from internal ConcurrentStopWatch.
+ /// Set() and Add() should not be used.
+ virtual void Set(double value) {
+ DCHECK(false);
+ }
+
+ virtual void Set(int64_t value) {
+ DCHECK(false);
+ }
+
+ virtual void Set(int value) {
+ DCHECK(false);
+ }
+
+ virtual void Add(int64_t delta) {
+ DCHECK(false);
+ }
+
+ private:
+ ConcurrentStopWatch csw_;
+};
+
+/// Utility class to mark an event when the object is destroyed.
+class ScopedEvent {
+ public:
+ ScopedEvent(RuntimeProfile::EventSequence* event_sequence, const std::string& label)
+ : label_(label),
+ event_sequence_(event_sequence) {
+ }
+
+ /// Mark the event when the object is destroyed
+ ~ScopedEvent() {
+ event_sequence_->MarkEvent(label_);
+ }
+
+ private:
+ /// Disable copy constructor and assignment
+ ScopedEvent(const ScopedEvent& event);
+ ScopedEvent& operator=(const ScopedEvent& event);
+
+ const std::string label_;
+ RuntimeProfile::EventSequence* event_sequence_;
+};
+
+/// Utility class to update time elapsed when the object goes out of scope.
+/// 'T' must implement the StopWatch "interface" (Start,Stop,ElapsedTime) but
+/// we use templates not to pay for virtual function overhead. In some cases
+/// the runtime profile may be deleted while the counter is still active. In this
+/// case the is_cancelled argument can be provided so that ScopedTimer will not
+/// update the counter when the query is cancelled. The destructor for ScopedTimer
+/// can access both is_cancelled and the counter, so the caller must ensure that it
+/// is safe to access both at the end of the scope in which the timer is used.
+template<class T>
+class ScopedTimer {
+ public:
+ ScopedTimer(RuntimeProfile::Counter* counter, const bool* is_cancelled = NULL) :
+ counter_(counter), is_cancelled_(is_cancelled){
+ if (counter == NULL) return;
+ DCHECK(counter->unit() == TUnit::TIME_NS);
+ sw_.Start();
+ }
+
+ void Stop() { sw_.Stop(); }
+ void Start() { sw_.Start(); }
+
+ void UpdateCounter() {
+ if (counter_ != NULL && !IsCancelled()) {
+ counter_->Add(sw_.ElapsedTime());
+ }
+ }
+
+ /// Updates the underlying counter for the final time and clears the pointer to it.
+ void ReleaseCounter() {
+ UpdateCounter();
+ counter_ = NULL;
+ }
+
+ bool IsCancelled() {
+ return is_cancelled_ != NULL && *is_cancelled_;
+ }
+
+ /// Update counter when object is destroyed
+ ~ScopedTimer() {
+ sw_.Stop();
+ UpdateCounter();
+ }
+
+ private:
+ /// Disable copy constructor and assignment
+ ScopedTimer(const ScopedTimer& timer);
+ ScopedTimer& operator=(const ScopedTimer& timer);
+
+ T sw_;
+ RuntimeProfile::Counter* counter_;
+ const bool* is_cancelled_;
+};
+
+
+#ifdef __APPLE__
+// On OS X rusage via thread is not supported. In addition, the majority of the fields of
+// the usage structs will be zeroed out. Since Apple is not going to be a major plaform
+// initially it will most likely be enough to capture only time.
+// C.f. http://blog.kuriositaet.de/?p=257
+#define RUSAGE_THREAD RUSAGE_SELF
+#endif
+
+/// Utility class to update ThreadCounter when the object goes out of scope or when Stop is
+/// called. Threads measurements will then be taken using getrusage.
+/// This is ~5x slower than ScopedTimer due to calling getrusage.
+class ThreadCounterMeasurement {
+ public:
+ ThreadCounterMeasurement(RuntimeProfile::ThreadCounters* counters) :
+ stop_(false), counters_(counters) {
+ DCHECK(counters != NULL);
+ sw_.Start();
+ int ret = getrusage(RUSAGE_THREAD, &usage_base_);
+ DCHECK_EQ(ret, 0);
+ }
+
+ /// Stop and update the counter
+ void Stop() {
+ if (stop_) return;
+ stop_ = true;
+ sw_.Stop();
+ rusage usage;
+ int ret = getrusage(RUSAGE_THREAD, &usage);
+ DCHECK_EQ(ret, 0);
+ int64_t utime_diff =
+ (usage.ru_utime.tv_sec - usage_base_.ru_utime.tv_sec) * 1000L * 1000L * 1000L +
+ (usage.ru_utime.tv_usec - usage_base_.ru_utime.tv_usec) * 1000L;
+ int64_t stime_diff =
+ (usage.ru_stime.tv_sec - usage_base_.ru_stime.tv_sec) * 1000L * 1000L * 1000L +
+ (usage.ru_stime.tv_usec - usage_base_.ru_stime.tv_usec) * 1000L;
+ counters_->total_time_->Add(sw_.ElapsedTime());
+ counters_->user_time_->Add(utime_diff);
+ counters_->sys_time_->Add(stime_diff);
+ counters_->voluntary_context_switches_->Add(usage.ru_nvcsw - usage_base_.ru_nvcsw);
+ counters_->involuntary_context_switches_->Add(
+ usage.ru_nivcsw - usage_base_.ru_nivcsw);
+ }
+
+ /// Update counter when object is destroyed
+ ~ThreadCounterMeasurement() {
+ Stop();
+ }
+
+ private:
+ /// Disable copy constructor and assignment
+ ThreadCounterMeasurement(const ThreadCounterMeasurement& timer);
+ ThreadCounterMeasurement& operator=(const ThreadCounterMeasurement& timer);
+
+ bool stop_;
+ rusage usage_base_;
+ MonotonicStopWatch sw_;
+ RuntimeProfile::ThreadCounters* counters_;
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index d98b54c..2c80082 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -20,7 +20,7 @@
#include "common/object-pool.h"
#include "util/cpu-info.h"
#include "util/periodic-counter-updater.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
#include "util/streaming-sampler.h"
#include "util/thread.h"
#include "util/time.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index bc7447a..714180a 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
#include <iomanip>
#include <iostream>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 8030d7b..98a02ed 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -17,64 +17,16 @@
#define IMPALA_UTIL_RUNTIME_PROFILE_H
#include <boost/function.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/unordered_map.hpp>
+#include <boost/thread/lock_guard.hpp>
#include <iostream>
-#include <sys/time.h>
-#include <sys/resource.h>
#include "common/atomic.h"
-#include "common/logging.h"
-#include "common/object-pool.h"
-#include "util/stopwatch.h"
-#include "util/streaming-sampler.h"
+#include "util/spinlock.h"
+
#include "gen-cpp/RuntimeProfile_types.h"
namespace impala {
-/// Define macros for updating counters. The macros make it very easy to disable
-/// all counters at compile time. Set this to 0 to remove counters. This is useful
-/// to do to make sure the counters aren't affecting the system.
-#define ENABLE_COUNTERS 1
-
-/// Some macro magic to generate unique ids using __COUNTER__
-#define CONCAT_IMPL(x, y) x##y
-#define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y)
-
-#if ENABLE_COUNTERS
- #define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit)
- #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \
- (profile)->AddTimeSeriesCounter(name, src_counter)
- #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
- #define ADD_CHILD_TIMER(profile, name, parent) \
- (profile)->AddCounter(name, TUnit::TIME_NS, parent)
- #define SCOPED_TIMER(c) \
- ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
- #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \
- ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled)
- #define COUNTER_ADD(c, v) (c)->Add(v)
- #define COUNTER_SET(c, v) (c)->Set(v)
- #define ADD_THREAD_COUNTERS(profile, prefix) (profile)->AddThreadCounters(prefix)
- #define SCOPED_THREAD_COUNTER_MEASUREMENT(c) \
- ThreadCounterMeasurement \
- MACRO_CONCAT(SCOPED_THREAD_COUNTER_MEASUREMENT, __COUNTER__)(c)
- #define SCOPED_CONCURRENT_COUNTER(c) \
- ScopedStopWatch<RuntimeProfile::ConcurrentTimerCounter> \
- MACRO_CONCAT(SCOPED_CONCURRENT_COUNTER, __COUNTER__)(c)
-#else
- #define ADD_COUNTER(profile, name, unit) NULL
- #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) NULL
- #define ADD_TIMER(profile, name) NULL
- #define ADD_CHILD_TIMER(profile, name, parent) NULL
- #define SCOPED_TIMER(c)
- #define CANCEL_SAFE_SCOPED_TIMER(c)
- #define COUNTER_ADD(c, v)
- #define COUNTER_SET(c, v)
- #define ADD_THREAD_COUNTERS(profile, prefix) NULL
- #define SCOPED_THREAD_COUNTER_MEASUREMENT(c)
- #define SCOPED_CONCURRENT_COUNTER(c)
-#endif
-
class ObjectPool;
/// Runtime profile is a group of profiling counters. It supports adding named counters
@@ -133,285 +85,16 @@ class RuntimeProfile {
TUnit::type unit_;
};
- /// A counter that keeps track of the highest value seen (reporting that
- /// as value()) and the current value.
- class HighWaterMarkCounter : public Counter {
- public:
- HighWaterMarkCounter(TUnit::type unit) : Counter(unit) {}
-
- virtual void Add(int64_t delta) {
- int64_t new_val = current_value_.Add(delta);
- UpdateMax(new_val);
- }
-
- /// Tries to increase the current value by delta. If current_value() + delta
- /// exceeds max, return false and current_value is not changed.
- bool TryAdd(int64_t delta, int64_t max) {
- while (true) {
- int64_t old_val = current_value_.Load();
- int64_t new_val = old_val + delta;
- if (UNLIKELY(new_val > max)) return false;
- if (LIKELY(current_value_.CompareAndSwap(old_val, new_val))) {
- UpdateMax(new_val);
- return true;
- }
- }
- }
-
- virtual void Set(int64_t v) {
- current_value_.Store(v);
- UpdateMax(v);
- }
-
- int64_t current_value() const { return current_value_.Load(); }
-
- private:
- /// Set 'value_' to 'v' if 'v' is larger than 'value_'. The entire operation is
- /// atomic.
- void UpdateMax(int64_t v) {
- while (true) {
- int64_t old_max = value_.Load();
- int64_t new_max = std::max(old_max, v);
- if (new_max == old_max) break; // Avoid atomic update.
- if (LIKELY(value_.CompareAndSwap(old_max, new_max))) break;
- }
- }
-
- /// The current value of the counter. value_ in the super class represents
- /// the high water mark.
- AtomicInt64 current_value_;
- };
+ class AveragedCounter;
+ class ConcurrentTimerCounter;
+ class DerivedCounter;
+ class EventSequence;
+ class HighWaterMarkCounter;
+ class ThreadCounters;
+ class TimeSeriesCounter;
typedef boost::function<int64_t ()> DerivedCounterFunction;
- /// A DerivedCounter also has a name and unit, but the value is computed.
- /// Do not call Set() and Add().
- class DerivedCounter : public Counter {
- public:
- DerivedCounter(TUnit::type unit, const DerivedCounterFunction& counter_fn)
- : Counter(unit),
- counter_fn_(counter_fn) {}
-
- virtual int64_t value() const {
- return counter_fn_();
- }
-
- private:
- DerivedCounterFunction counter_fn_;
- };
-
- /// An AveragedCounter maintains a set of counters and its value is the
- /// average of the values in that set. The average is updated through calls
- /// to UpdateCounter(), which may add a new counter or update an existing counter.
- /// Set() and Add() should not be called.
- class AveragedCounter : public Counter {
- public:
- AveragedCounter(TUnit::type unit)
- : Counter(unit),
- current_double_sum_(0.0),
- current_int_sum_(0) {
- }
-
- /// Update counter_value_map_ with the new counter. This may require the counter
- /// to be added to the map.
- /// No locks are obtained within this class because UpdateCounter() is called from
- /// UpdateAverage(), which obtains locks on the entire counter map in a profile.
- void UpdateCounter(Counter* new_counter) {
- DCHECK_EQ(new_counter->unit_, unit_);
- boost::unordered_map<Counter*, int64_t>::iterator it =
- counter_value_map_.find(new_counter);
- int64_t old_val = 0;
- if (it != counter_value_map_.end()) {
- old_val = it->second;
- it->second = new_counter->value();
- } else {
- counter_value_map_[new_counter] = new_counter->value();
- }
-
- if (unit_ == TUnit::DOUBLE_VALUE) {
- double old_double_val = *reinterpret_cast<double*>(&old_val);
- current_double_sum_ += (new_counter->double_value() - old_double_val);
- double result_val = current_double_sum_ / (double) counter_value_map_.size();
- value_.Store(*reinterpret_cast<int64_t*>(&result_val));
- } else {
- current_int_sum_ += (new_counter->value() - old_val);
- value_.Store(current_int_sum_ / counter_value_map_.size());
- }
- }
-
- /// The value for this counter should be updated through UpdateCounter().
- /// Set() and Add() should not be used.
- virtual void Set(double value) {
- DCHECK(false);
- }
-
- virtual void Set(int64_t value) {
- DCHECK(false);
- }
-
- virtual void Add(int64_t delta) {
- DCHECK(false);
- }
-
- private:
- /// Map from counters to their existing values. Modified via UpdateCounter().
- boost::unordered_map<Counter*, int64_t> counter_value_map_;
-
- /// Current sums of values from counter_value_map_. Only one of these is used,
- /// depending on the unit of the counter. current_double_sum_ is used for
- /// DOUBLE_VALUE, current_int_sum_ otherwise.
- double current_double_sum_;
- int64_t current_int_sum_;
- };
-
- /// A set of counters that measure thread info, such as total time, user time, sys time.
- class ThreadCounters {
- private:
- friend class ThreadCounterMeasurement;
- friend class RuntimeProfile;
-
- Counter* total_time_; // total wall clock time
- Counter* user_time_; // user CPU time
- Counter* sys_time_; // system CPU time
-
- /// The number of times a context switch resulted due to a process voluntarily giving
- /// up the processor before its time slice was completed.
- Counter* voluntary_context_switches_;
-
- /// The number of times a context switch resulted due to a higher priority process
- /// becoming runnable or because the current process exceeded its time slice.
- Counter* involuntary_context_switches_;
- };
-
- /// An EventSequence captures a sequence of events (each added by
- /// calling MarkEvent). Each event has a text label, and a time
- /// (measured relative to the moment Start() was called as t=0). It is
- /// useful for tracking the evolution of some serial process, such as
- /// the query lifecycle.
- class EventSequence {
- public:
- EventSequence() { }
-
- /// Helper constructor for building from Thrift
- EventSequence(const std::vector<int64_t>& timestamps,
- const std::vector<std::string>& labels) {
- DCHECK(timestamps.size() == labels.size());
- for (int i = 0; i < timestamps.size(); ++i) {
- events_.push_back(make_pair(labels[i], timestamps[i]));
- }
- }
-
- /// Starts the timer without resetting it.
- void Start() { sw_.Start(); }
-
- /// Stores an event in sequence with the given label and the current time
- /// (relative to the first time Start() was called) as the timestamp.
- void MarkEvent(const std::string& label) {
- Event event = make_pair(label, sw_.ElapsedTime());
- boost::lock_guard<SpinLock> event_lock(lock_);
- events_.push_back(event);
- }
-
- int64_t ElapsedTime() { return sw_.ElapsedTime(); }
-
- /// An Event is a <label, timestamp> pair.
- typedef std::pair<std::string, int64_t> Event;
-
- /// An EventList is a sequence of Events, in increasing timestamp order.
- typedef std::vector<Event> EventList;
-
- /// Copies the member events_ into the supplied vector 'events'.
- /// The supplied vector 'events' is cleared before this.
- void GetEvents(std::vector<Event>* events) {
- events->clear();
- boost::lock_guard<SpinLock> event_lock(lock_);
- events->insert(events->end(), events_.begin(), events_.end());
- }
-
- void ToThrift(TEventSequence* seq) const;
-
- private:
- /// Protect access to events_.
- SpinLock lock_;
-
- /// Stored in increasing time order.
- EventList events_;
-
- /// Timer which allows events to be timestamped when they are recorded.
- MonotonicStopWatch sw_;
- };
-
- typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
- class TimeSeriesCounter {
- public:
- std::string DebugString() const;
-
- void AddSample(int ms_elapsed) {
- int64_t sample = sample_fn_();
- samples_.AddSample(sample, ms_elapsed);
- }
-
- private:
- friend class RuntimeProfile;
-
- TimeSeriesCounter(const std::string& name, TUnit::type unit,
- DerivedCounterFunction fn)
- : name_(name), unit_(unit), sample_fn_(fn) {
- }
-
- /// Construct a time series object from existing sample data. This counter
- /// is then read-only (i.e. there is no sample function).
- TimeSeriesCounter(const std::string& name, TUnit::type unit, int period,
- const std::vector<int64_t>& values)
- : name_(name), unit_(unit), sample_fn_(NULL), samples_(period, values) {
- }
-
- void ToThrift(TTimeSeriesCounter* counter);
-
- std::string name_;
- TUnit::type unit_;
- DerivedCounterFunction sample_fn_;
- StreamingCounterSampler samples_;
- };
-
- /// Counter whose value comes from an internal ConcurrentStopWatch to track multiple threads
- /// concurrent running time.
- class ConcurrentTimerCounter : public Counter {
- public:
- ConcurrentTimerCounter(TUnit::type unit) : Counter(unit) {}
-
- virtual int64_t value() const { return csw_.TotalRunningTime(); }
-
- void Start() { csw_.Start(); }
-
- void Stop() { csw_.Stop(); }
-
- /// Returns lap time for caller who wants delta update of concurrent running time.
- uint64_t LapTime() { return csw_.LapTime(); }
-
- /// The value for this counter should come from internal ConcurrentStopWatch.
- /// Set() and Add() should not be used.
- virtual void Set(double value) {
- DCHECK(false);
- }
-
- virtual void Set(int64_t value) {
- DCHECK(false);
- }
-
- virtual void Set(int value) {
- DCHECK(false);
- }
-
- virtual void Add(int64_t delta) {
- DCHECK(false);
- }
-
- private:
- ConcurrentStopWatch csw_;
- };
-
-
/// Create a runtime profile object with 'name'. Counters and merged profile are
/// allocated from pool.
/// If is_averaged_profile is true, the counters in this profile will be derived
@@ -721,166 +404,6 @@ class RuntimeProfile {
const ChildCounterMap& child_counter_map, std::ostream* s);
};
-/// Utility class to mark an event when the object is destroyed.
-class ScopedEvent {
- public:
- ScopedEvent(RuntimeProfile::EventSequence* event_sequence, const std::string& label)
- : label_(label),
- event_sequence_(event_sequence) {
- }
-
- /// Mark the event when the object is destroyed
- ~ScopedEvent() {
- event_sequence_->MarkEvent(label_);
- }
-
- private:
- /// Disable copy constructor and assignment
- ScopedEvent(const ScopedEvent& event);
- ScopedEvent& operator=(const ScopedEvent& event);
-
- const std::string label_;
- RuntimeProfile::EventSequence* event_sequence_;
-};
-
-/// Utility class to update the counter at object construction and destruction.
-/// When the object is constructed, decrement the counter by val.
-/// When the object goes out of scope, increment the counter by val.
-class ScopedCounter {
- public:
- ScopedCounter(RuntimeProfile::Counter* counter, int64_t val) :
- val_(val),
- counter_(counter) {
- if (counter == NULL) return;
- counter_->Add(-1L * val_);
- }
-
- /// Increment the counter when object is destroyed
- ~ScopedCounter() {
- if (counter_ != NULL) counter_->Add(val_);
- }
-
- private:
- /// Disable copy constructor and assignment
- ScopedCounter(const ScopedCounter& counter);
- ScopedCounter& operator=(const ScopedCounter& counter);
-
- int64_t val_;
- RuntimeProfile::Counter* counter_;
-};
-
-/// Utility class to update time elapsed when the object goes out of scope.
-/// 'T' must implement the StopWatch "interface" (Start,Stop,ElapsedTime) but
-/// we use templates not to pay for virtual function overhead. In some cases
-/// the runtime profile may be deleted while the counter is still active. In this
-/// case the is_cancelled argument can be provided so that ScopedTimer will not
-/// update the counter when the query is cancelled. The destructor for ScopedTimer
-/// can access both is_cancelled and the counter, so the caller must ensure that it
-/// is safe to access both at the end of the scope in which the timer is used.
-template<class T>
-class ScopedTimer {
- public:
- ScopedTimer(RuntimeProfile::Counter* counter, const bool* is_cancelled = NULL) :
- counter_(counter), is_cancelled_(is_cancelled){
- if (counter == NULL) return;
- DCHECK(counter->unit() == TUnit::TIME_NS);
- sw_.Start();
- }
-
- void Stop() { sw_.Stop(); }
- void Start() { sw_.Start(); }
-
- void UpdateCounter() {
- if (counter_ != NULL && !IsCancelled()) {
- counter_->Add(sw_.ElapsedTime());
- }
- }
-
- /// Updates the underlying counter for the final time and clears the pointer to it.
- void ReleaseCounter() {
- UpdateCounter();
- counter_ = NULL;
- }
-
- bool IsCancelled() {
- return is_cancelled_ != NULL && *is_cancelled_;
- }
-
- /// Update counter when object is destroyed
- ~ScopedTimer() {
- sw_.Stop();
- UpdateCounter();
- }
-
- private:
- /// Disable copy constructor and assignment
- ScopedTimer(const ScopedTimer& timer);
- ScopedTimer& operator=(const ScopedTimer& timer);
-
- T sw_;
- RuntimeProfile::Counter* counter_;
- const bool* is_cancelled_;
-};
-
-#ifdef __APPLE__
-// On OS X rusage via thread is not supported. In addition, the majority of the fields of
-// the usage structs will be zeroed out. Since Apple is not going to be a major plaform
-// initially it will most likely be enough to capture only time.
-// C.f. http://blog.kuriositaet.de/?p=257
-#define RUSAGE_THREAD RUSAGE_SELF
-#endif
-
-/// Utility class to update ThreadCounter when the object goes out of scope or when Stop is
-/// called. Threads measurements will then be taken using getrusage.
-/// This is ~5x slower than ScopedTimer due to calling getrusage.
-class ThreadCounterMeasurement {
- public:
- ThreadCounterMeasurement(RuntimeProfile::ThreadCounters* counters) :
- stop_(false), counters_(counters) {
- DCHECK(counters != NULL);
- sw_.Start();
- int ret = getrusage(RUSAGE_THREAD, &usage_base_);
- DCHECK_EQ(ret, 0);
- }
-
- /// Stop and update the counter
- void Stop() {
- if (stop_) return;
- stop_ = true;
- sw_.Stop();
- rusage usage;
- int ret = getrusage(RUSAGE_THREAD, &usage);
- DCHECK_EQ(ret, 0);
- int64_t utime_diff =
- (usage.ru_utime.tv_sec - usage_base_.ru_utime.tv_sec) * 1000L * 1000L * 1000L +
- (usage.ru_utime.tv_usec - usage_base_.ru_utime.tv_usec) * 1000L;
- int64_t stime_diff =
- (usage.ru_stime.tv_sec - usage_base_.ru_stime.tv_sec) * 1000L * 1000L * 1000L +
- (usage.ru_stime.tv_usec - usage_base_.ru_stime.tv_usec) * 1000L;
- counters_->total_time_->Add(sw_.ElapsedTime());
- counters_->user_time_->Add(utime_diff);
- counters_->sys_time_->Add(stime_diff);
- counters_->voluntary_context_switches_->Add(usage.ru_nvcsw - usage_base_.ru_nvcsw);
- counters_->involuntary_context_switches_->Add(
- usage.ru_nivcsw - usage_base_.ru_nivcsw);
- }
-
- /// Update counter when object is destroyed
- ~ThreadCounterMeasurement() {
- Stop();
- }
-
- private:
- /// Disable copy constructor and assignment
- ThreadCounterMeasurement(const ThreadCounterMeasurement& timer);
- ThreadCounterMeasurement& operator=(const ThreadCounterMeasurement& timer);
-
- bool stop_;
- rusage usage_base_;
- MonotonicStopWatch sw_;
- RuntimeProfile::ThreadCounters* counters_;
-};
-
}
#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/simple-logger.cc
----------------------------------------------------------------------
diff --git a/be/src/util/simple-logger.cc b/be/src/util/simple-logger.cc
index 6fca3fa..87ef0f5 100644
--- a/be/src/util/simple-logger.cc
+++ b/be/src/util/simple-logger.cc
@@ -18,6 +18,7 @@
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/filesystem.hpp>
#include <gutil/strings/substitute.h>
+#include <boost/thread/lock_guard.hpp>
#include "common/names.h"
#include "util/logging-support.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/simple-logger.h
----------------------------------------------------------------------
diff --git a/be/src/util/simple-logger.h b/be/src/util/simple-logger.h
index 66031fd..af17d26 100644
--- a/be/src/util/simple-logger.h
+++ b/be/src/util/simple-logger.h
@@ -15,8 +15,8 @@
#ifndef IMPALA_SERVICE_SIMPLE_LOGGER_H
#define IMPALA_SERVICE_SIMPLE_LOGGER_H
-#include <boost/thread/thread.hpp>
#include <fstream>
+#include <boost/thread/mutex.hpp>
#include "common/status.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/streaming-sampler.h
----------------------------------------------------------------------
diff --git a/be/src/util/streaming-sampler.h b/be/src/util/streaming-sampler.h
index 9cf4151..71e7548 100644
--- a/be/src/util/streaming-sampler.h
+++ b/be/src/util/streaming-sampler.h
@@ -17,6 +17,8 @@
#include <string.h>
#include <iostream>
+#include <boost/thread/lock_guard.hpp>
+
#include "util/spinlock.h"
namespace impala {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/tuple-row-compare.cc
----------------------------------------------------------------------
diff --git a/be/src/util/tuple-row-compare.cc b/be/src/util/tuple-row-compare.cc
index a0bbf97..18ae53d 100644
--- a/be/src/util/tuple-row-compare.cc
+++ b/be/src/util/tuple-row-compare.cc
@@ -19,6 +19,7 @@
#include "codegen/codegen-anyval.h"
#include "codegen/llvm-codegen.h"
#include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
using namespace impala;
using namespace llvm;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/util/tuple-row-compare.h
----------------------------------------------------------------------
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index 8505f19..d20d864 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -19,10 +19,11 @@
#include "exec/sort-exec-exprs.h"
#include "exprs/expr.h"
#include "exprs/expr-context.h"
+#include "runtime/descriptors.h"
+#include "runtime/raw-value.h"
#include "runtime/raw-value.inline.h"
#include "runtime/tuple.h"
#include "runtime/tuple-row.h"
-#include "runtime/descriptors.h"
namespace impala {