You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/11/01 21:38:37 UTC

[1/4] impala git commit: IMPALA-7213, IMPALA-7241: Port ReportExecStatus() RPC to use KRPC

Repository: impala
Updated Branches:
  refs/heads/master e3a702707 -> 95b56d0e2


http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 7222dae..e517f3e 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -385,49 +385,52 @@ struct TQueryCtx {
   // TODO: determine whether we can get this somehow via the Thrift rpc mechanism.
   6: optional Types.TNetworkAddress coord_address
 
+  // The initiating coordinator's address of its KRPC based ImpalaInternalService.
+  7: optional Types.TNetworkAddress coord_krpc_address
+
   // List of tables missing relevant table and/or column stats. Used for
   // populating query-profile fields consumed by CM as well as warning messages.
-  7: optional list<CatalogObjects.TTableName> tables_missing_stats
+  8: optional list<CatalogObjects.TTableName> tables_missing_stats
 
   // Internal flag to disable spilling. Used as a guard against potentially
   // disastrous query plans. The rationale is that cancelling queries, e.g.,
   // with a huge join build is preferable over spilling "forever".
-  8: optional bool disable_spilling
+  9: optional bool disable_spilling
 
   // Set if this is a child query (e.g. a child of a COMPUTE STATS request)
-  9: optional Types.TUniqueId parent_query_id
+  10: optional Types.TUniqueId parent_query_id
 
   // List of tables suspected to have corrupt stats
-  10: optional list<CatalogObjects.TTableName> tables_with_corrupt_stats
+  11: optional list<CatalogObjects.TTableName> tables_with_corrupt_stats
 
   // The snapshot timestamp as of which to execute the query
   // When the backing storage engine supports snapshot timestamps (such as Kudu) this
   // allows to select a snapshot timestamp on which to perform the scan, making sure that
   // results returned from multiple scan nodes are consistent.
   // This defaults to -1 when no timestamp is specified.
-  11: optional i64 snapshot_timestamp = -1;
+  12: optional i64 snapshot_timestamp = -1;
 
   // Optional for frontend tests.
-  12: optional Descriptors.TDescriptorTable desc_tbl
+  13: optional Descriptors.TDescriptorTable desc_tbl
 
   // Milliseconds since UNIX epoch at the start of query execution.
-  13: required i64 start_unix_millis
+  14: required i64 start_unix_millis
 
   // Hint to disable codegen. Set by planner for single-node optimization or by the
   // backend in NativeEvalExprsWithoutRow() in FESupport. This flag is only advisory to
   // avoid the overhead of codegen and can be ignored if codegen is needed functionally.
-  14: optional bool disable_codegen_hint = false;
+  15: optional bool disable_codegen_hint = false;
 
   // List of tables with scan ranges that map to blocks with missing disk IDs.
-  15: optional list<CatalogObjects.TTableName> tables_missing_diskids
+  16: optional list<CatalogObjects.TTableName> tables_missing_diskids
 
   // The resolved admission control pool to which this request will be submitted. May be
   // unset for statements that aren't subjected to admission control (e.g. USE, SET).
-  16: optional string request_pool
+  17: optional string request_pool
 
   // String containing a timestamp (in UTC) set as the query submission time. It
   // represents the same point in time as now_string
-  17: required string utc_timestamp_string
+  18: required string utc_timestamp_string
 
   // String containing name of the local timezone.
   // It is guaranteed to be a valid timezone on the coordinator (but not necessarily on
@@ -436,7 +439,7 @@ struct TQueryCtx {
   //   still has an effect if TimezoneDatabase::LocalZoneName() cannot find the
   //   system's local timezone and falls back to UTC. This logic will be removed in
   //   IMPALA-7359, which will make this member completely obsolete.
-  18: required string local_time_zone
+  19: required string local_time_zone
 }
 
 // Specification of one output destination of a plan fragment
@@ -557,158 +560,6 @@ struct TExecQueryFInstancesResult {
   1: optional Status.TStatus status
 }
 
-
-// ReportExecStatus
-
-struct TParquetInsertStats {
-  // For each column, the on disk byte size
-  1: required map<string, i64> per_column_size
-}
-
-struct TKuduDmlStats {
-  // The number of reported per-row errors, i.e. this many rows were not modified.
-  // Note that this aggregate is less useful than a breakdown of the number of errors by
-  // error type, e.g. number of rows with duplicate key conflicts, number of rows
-  // violating nullability constraints, etc., but it isn't possible yet to differentiate
-  // all error types in the KuduTableSink yet.
-  1: optional i64 num_row_errors
-}
-
-// Per partition DML stats
-// TODO: this should include the table stats that we update the metastore with.
-// TODO: Refactor to reflect usage by other DML statements.
-struct TInsertStats {
-  1: required i64 bytes_written
-  2: optional TParquetInsertStats parquet_stats
-  3: optional TKuduDmlStats kudu_stats
-}
-
-const string ROOT_PARTITION_KEY = ''
-
-// Per-partition statistics and metadata resulting from DML statements.
-// TODO: Refactor to reflect usage by other DML statements.
-struct TInsertPartitionStatus {
-  // The id of the partition written to (may be -1 if the partition is created by this
-  // query). See THdfsTable.partitions.
-  1: optional i64 id
-
-  // The number of rows modified in this partition
-  2: optional i64 num_modified_rows
-
-  // Detailed statistics gathered by table writers for this partition
-  3: optional TInsertStats stats
-
-  // Fully qualified URI to the base directory for this partition.
-  4: required string partition_base_dir
-
-  // The latest observed Kudu timestamp reported by the local KuduSession.
-  // This value is an unsigned int64.
-  5: optional i64 kudu_latest_observed_ts
-}
-
-// The results of a DML statement, sent to the coordinator as part of
-// TReportExecStatusParams
-// TODO: Refactor to reflect usage by other DML statements.
-struct TInsertExecStatus {
-  // A map from temporary absolute file path to final absolute destination. The
-  // coordinator performs these updates after the query completes.
-  1: required map<string, string> files_to_move;
-
-  // Per-partition details, used in finalization and reporting.
-  // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with the
-  // root's key in an unpartitioned table being ROOT_PARTITION_KEY.
-  // The target table name is recorded in the corresponding TQueryExecRequest
-  2: optional map<string, TInsertPartitionStatus> per_partition_status
-}
-
-// Error message exchange format
-struct TErrorLogEntry {
-
-  // Number of error messages reported using the above identifier
-  1: i32 count = 0
-
-  // Sample messages from the above error code
-  2: list<string> messages
-}
-
-// Represents the states that a fragment instance goes through during its execution. The
-// current state gets sent back to the coordinator and will be presented to users through
-// the debug webpages.
-// The states are listed in order and one state will only strictly be reached after all
-// the previous states.
-enum TFInstanceExecState {
-  WAITING_FOR_EXEC,
-  WAITING_FOR_PREPARE,
-  WAITING_FOR_CODEGEN,
-  WAITING_FOR_OPEN,
-  WAITING_FOR_FIRST_BATCH,
-  FIRST_BATCH_PRODUCED,
-  PRODUCING_DATA,
-  LAST_BATCH_SENT,
-  FINISHED
-}
-
-struct TFragmentInstanceExecStatus {
-  // required in V1
-  1: optional Types.TUniqueId fragment_instance_id
-
-  // Status of fragment execution; any error status means it's done.
-  // required in V1
-  2: optional Status.TStatus status
-
-  // If true, fragment finished executing.
-  // required in V1
-  3: optional bool done
-
-  // cumulative profile
-  // required in V1
-  4: optional RuntimeProfile.TRuntimeProfileTree profile
-
-  // The current state of this fragment instance's execution.
-  // required in V1
-  5: optional TFInstanceExecState current_state
-}
-
-struct TReportExecStatusParams {
-  1: required ImpalaInternalServiceVersion protocol_version
-
-  // required in V1
-  2: optional Types.TUniqueId query_id
-
-  // same as TExecQueryFInstancesParams.coord_state_idx
-  // required in V1
-  3: optional i32 coord_state_idx
-
-  4: list<TFragmentInstanceExecStatus> instance_exec_status
-
-  // Cumulative structural changes made by the table sink of any instance
-  // included in instance_exec_status
-  // optional in V1
-  5: optional TInsertExecStatus insert_exec_status;
-
-  // New errors that have not been reported to the coordinator by any of the
-  // instances included in instance_exec_status
-  6: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log;
-
-  // Cumulative status for this backend. A backend can have an error from a specific
-  // fragment instance, or it can have a general error that is independent of any
-  // individual fragment. If reporting a single error, this status is always set to
-  // the error being reported. If reporting multiple errors, the status is set by the
-  // following rules:
-  // 1. A general error takes precedence over any fragment instance error.
-  // 2. Any fragment instance error takes precedence over any cancelled status.
-  // 3. If multiple fragments have errors, prefer the error that comes first in the
-  // 'instance_exec_status' list.
-  // This status is only OK if all fragment instances included are OK.
-  7: optional Status.TStatus status;
-}
-
-struct TReportExecStatusResult {
-  // required in V1
-  1: optional Status.TStatus status
-}
-
-
 // CancelQueryFInstances
 
 struct TCancelQueryFInstancesParams {
@@ -908,10 +759,6 @@ service ImpalaInternalService {
   // Returns as soon as all incoming data streams have been set up.
   TExecQueryFInstancesResult ExecQueryFInstances(1:TExecQueryFInstancesParams params);
 
-  // Periodically called by backend to report status of fragment instance execution
-  // back to coord; also called when execution is finished, for whatever reason.
-  TReportExecStatusResult ReportExecStatus(1:TReportExecStatusParams params);
-
   // Called by coord to cancel execution of a single query's fragment instances, which
   // the coordinator initiated with a prior call to ExecQueryFInstances.
   // Cancellation is asynchronous.

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/tests/custom_cluster/test_rpc_timeout.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_rpc_timeout.py b/tests/custom_cluster/test_rpc_timeout.py
index 419bec0..bbec46a 100644
--- a/tests/custom_cluster/test_rpc_timeout.py
+++ b/tests/custom_cluster/test_rpc_timeout.py
@@ -39,10 +39,10 @@ class TestRPCTimeout(CustomClusterTestSuite):
       pytest.skip('runs only in exhaustive')
     super(TestRPCTimeout, cls).setup_class()
 
-  def execute_query_verify_metrics(self, query, repeat = 1):
+  def execute_query_verify_metrics(self, query, query_options=None, repeat=1):
     for i in range(repeat):
       try:
-        self.client.execute(query)
+        self.execute_query(query, query_options)
       except ImpalaBeeswaxException:
         pass
     verifiers = [ MetricVerifier(i.service) for i in ImpalaCluster().impalads ]
@@ -121,20 +121,30 @@ class TestRPCTimeout(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
-      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=5")
-  def test_transmitdata_timeout(self, vector):
-    self.execute_query_verify_metrics(self.TEST_QUERY)
+      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=7"
+      " --datastream_sender_timeout_ms=30000")
+  def test_random_rpc_timeout(self, vector):
+    self.execute_query_verify_metrics(self.TEST_QUERY, None, 10)
 
+  # Inject jitter into the RPC handler of ReportExecStatus() to trigger RPC timeout.
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
-      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=6"
-      " --status_report_interval=1")
+  @CustomClusterTestSuite.with_args("--status_report_interval_ms=10"
+      " --backend_client_rpc_timeout_ms=1000")
   def test_reportexecstatus_timeout(self, vector):
-    self.execute_query_verify_metrics(self.TEST_QUERY)
+    query_options = {'debug_action': 'REPORT_EXEC_STATUS_DELAY:JITTER@1500@0.5'}
+    self.execute_query_verify_metrics(self.TEST_QUERY, query_options, 10)
 
+  # Use a small service queue memory limit and a single service thread to exercise
+  # the retry paths in the ReportExecStatus() RPC
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
-      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=7"
-      " --datastream_sender_timeout_ms=30000")
-  def test_random_rpc_timeout(self, vector):
-    self.execute_query_verify_metrics(self.TEST_QUERY, 10)
+  @CustomClusterTestSuite.with_args("--status_report_interval_ms=10"
+      " --control_service_queue_mem_limit=1 --control_service_num_svc_threads=1")
+  def test_reportexecstatus_retry(self, vector):
+    self.execute_query_verify_metrics(self.TEST_QUERY, None, 10)
+
+  # Inject artificial failure during thrift profile serialization / deserialization.
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--status_report_interval_ms=10")
+  def test_reportexecstatus_profile_fail(self):
+    query_options = {'debug_action': 'REPORT_EXEC_STATUS_PROFILE:FAIL@0.8'}
+    self.execute_query_verify_metrics(self.TEST_QUERY, query_options, 10)


[2/4] impala git commit: IMPALA-7213, IMPALA-7241: Port ReportExecStatus() RPC to use KRPC

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 6a2e5b3..acffcbe 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -35,7 +35,7 @@
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
-#include "rpc/rpc-mgr.inline.h"
+#include "rpc/rpc-mgr.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
@@ -43,6 +43,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tuple-row.h"
+#include "service/data-stream-service.h"
 #include "util/aligned-new.h"
 #include "util/debug-util.h"
 #include "util/network-util.h"
@@ -306,8 +307,7 @@ Status KrpcDataStreamSender::Channel::Init(RuntimeState* state) {
   batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker()));
 
   // Create a DataStreamService proxy to the destination.
-  RpcMgr* rpc_mgr = ExecEnv::GetInstance()->rpc_mgr();
-  RETURN_IF_ERROR(rpc_mgr->GetProxy(address_, hostname_, &proxy_));
+  RETURN_IF_ERROR(DataStreamService::GetProxy(address_, hostname_, &proxy_));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 12710bf..c6b8440 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -21,7 +21,13 @@
 #include <boost/thread/locks.hpp>
 
 #include "common/thread-debug-info.h"
+#include "exec/kudu-util.h"
 #include "exprs/expr.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "rpc/rpc-mgr.h"
 #include "runtime/backend-client.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
@@ -33,15 +39,25 @@
 #include "runtime/query-exec-mgr.h"
 #include "runtime/runtime-state.h"
 #include "runtime/scanner-mem-limiter.h"
+#include "service/control-service.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
 #include "util/thread.h"
 
+#include "gen-cpp/control_service.pb.h"
+#include "gen-cpp/control_service.proxy.h"
+
+using kudu::MonoDelta;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcSidecar;
+
 #include "common/names.h"
 
 DEFINE_int32(report_status_retry_interval_ms, 100,
     "The interval in milliseconds to wait before retrying a failed status report RPC to "
     "the coordinator.");
+DECLARE_int32(backend_client_rpc_timeout_ms);
+DECLARE_int64(rpc_max_message_size);
 
 using namespace impala;
 
@@ -111,7 +127,7 @@ QueryState::~QueryState() {
   }
 }
 
-Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
+Status QueryState::Init(const TExecQueryFInstancesParams& exec_rpc_params) {
   // Decremented in QueryExecMgr::StartQueryHelper() on success or by the caller of
   // Init() on failure. We need to do this before any returns because Init() always
   // returns a resource refcount to its caller.
@@ -131,27 +147,31 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
 
   RETURN_IF_ERROR(InitBufferPoolState());
 
+  // Initialize the RPC proxy once and report any error.
+  RETURN_IF_ERROR(ControlService::GetProxy(query_ctx().coord_krpc_address,
+      query_ctx().coord_address.hostname, &proxy_));
+
   // don't copy query_ctx, it's large and we already did that in the c'tor
-  rpc_params_.__set_coord_state_idx(rpc_params.coord_state_idx);
+  exec_rpc_params_.__set_coord_state_idx(exec_rpc_params.coord_state_idx);
   TExecQueryFInstancesParams& non_const_params =
-      const_cast<TExecQueryFInstancesParams&>(rpc_params);
-  rpc_params_.fragment_ctxs.swap(non_const_params.fragment_ctxs);
-  rpc_params_.__isset.fragment_ctxs = true;
-  rpc_params_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs);
-  rpc_params_.__isset.fragment_instance_ctxs = true;
+      const_cast<TExecQueryFInstancesParams&>(exec_rpc_params);
+  exec_rpc_params_.fragment_ctxs.swap(non_const_params.fragment_ctxs);
+  exec_rpc_params_.__isset.fragment_ctxs = true;
+  exec_rpc_params_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs);
+  exec_rpc_params_.__isset.fragment_instance_ctxs = true;
 
   instances_prepared_barrier_.reset(
-      new CountingBarrier(rpc_params_.fragment_instance_ctxs.size()));
+      new CountingBarrier(exec_rpc_params_.fragment_instance_ctxs.size()));
   instances_finished_barrier_.reset(
-      new CountingBarrier(rpc_params_.fragment_instance_ctxs.size()));
+      new CountingBarrier(exec_rpc_params_.fragment_instance_ctxs.size()));
 
   // Claim the query-wide minimum reservation. Do this last so that we don't need
   // to handle releasing it if a later step fails.
   initial_reservations_ = obj_pool_.Add(new InitialReservations(&obj_pool_,
       buffer_reservation_, query_mem_tracker_,
-      rpc_params.initial_mem_reservation_total_claims));
+      exec_rpc_params.initial_mem_reservation_total_claims));
   RETURN_IF_ERROR(
-      initial_reservations_->Init(query_id(), rpc_params.min_mem_reservation_bytes));
+      initial_reservations_->Init(query_id(), exec_rpc_params.min_mem_reservation_bytes));
   scanner_mem_limiter_ = obj_pool_.Add(new ScannerMemLimiter);
   return Status::OK();
 }
@@ -241,6 +261,58 @@ void QueryState::ReportExecStatus(bool done, const Status& status,
   ReportExecStatusAux(done, status, fis, true);
 }
 
+void QueryState::ConstructReport(bool done, const Status& status,
+    FragmentInstanceState* fis, ReportExecStatusRequestPB* report,
+    ThriftSerializer* serializer, uint8_t** profile_buf, uint32_t* profile_len) {
+  report->Clear();
+  TUniqueIdToUniqueIdPB(query_id(), report->mutable_query_id());
+  DCHECK(exec_rpc_params().__isset.coord_state_idx);
+  report->set_coord_state_idx(exec_rpc_params().coord_state_idx);
+  status.ToProto(report->mutable_status());
+
+  if (fis != nullptr) {
+    // create status for 'fis'
+    FragmentInstanceExecStatusPB* instance_status = report->add_instance_exec_status();
+    instance_status->set_report_seq_no(fis->AdvanceReportSeqNo());
+    const TUniqueId& finstance_id = fis->instance_id();
+    TUniqueIdToUniqueIdPB(finstance_id, instance_status->mutable_fragment_instance_id());
+    status.ToProto(instance_status->mutable_status());
+    instance_status->set_done(done);
+    instance_status->set_current_state(fis->current_state());
+
+    // Only send updates to insert status if fragment is finished, the coordinator waits
+    // until query execution is done to use them anyhow.
+    RuntimeState* state = fis->runtime_state();
+    if (done) {
+      state->dml_exec_state()->ToProto(instance_status->mutable_dml_exec_status());
+    }
+
+    // Send new errors to coordinator
+    state->GetUnreportedErrors(instance_status->mutable_error_log());
+
+    // Debug action to simulate failure to serialize the profile.
+    if (!DebugAction(query_options(), "REPORT_EXEC_STATUS_PROFILE").ok()) {
+      DCHECK(profile_buf == nullptr);
+      return;
+    }
+
+    // Generate the runtime profile.
+    DCHECK(fis->profile() != nullptr);
+    TRuntimeProfileTree thrift_profile;
+    fis->profile()->ToThrift(&thrift_profile);
+    Status serialize_status =
+        serializer->SerializeToBuffer(&thrift_profile, profile_len, profile_buf);
+    if (UNLIKELY(!serialize_status.ok() ||
+            *profile_len > FLAGS_rpc_max_message_size)) {
+      profile_buf = nullptr;
+      LOG(ERROR) << Substitute("Failed to create $0profile for query fragment $1: "
+          "status=$2 len=$3", done ? "final " : "", PrintId(finstance_id),
+          serialize_status.ok() ? "OK" : serialize_status.GetDetail(), *profile_len);
+    }
+  }
+}
+
+// TODO: rethink whether 'done' can be inferred from 'status' or 'query_status_'.
 void QueryState::ReportExecStatusAux(bool done, const Status& status,
     FragmentInstanceState* fis, bool instances_started) {
   // if we're reporting an error, we're done
@@ -251,56 +323,62 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
   // This will send a report even if we are cancelled.  If the query completed correctly
   // but fragments still need to be cancelled (e.g. limit reached), the coordinator will
   // be waiting for a final report and profile.
-  TReportExecStatusParams params;
-  params.protocol_version = ImpalaInternalServiceVersion::V1;
-  params.__set_query_id(query_ctx().query_id);
-  DCHECK(rpc_params().__isset.coord_state_idx);
-  params.__set_coord_state_idx(rpc_params().coord_state_idx);
-  status.SetTStatus(&params);
-
-  if (fis != nullptr) {
-    // create status for 'fis'
-    params.instance_exec_status.emplace_back();
-    params.__isset.instance_exec_status = true;
-    TFragmentInstanceExecStatus& instance_status = params.instance_exec_status.back();
-    instance_status.__set_fragment_instance_id(fis->instance_id());
-    status.SetTStatus(&instance_status);
-    instance_status.__set_done(done);
-    instance_status.__set_current_state(fis->current_state());
+  ReportExecStatusRequestPB report;
 
-    DCHECK(fis->profile() != nullptr);
-    fis->profile()->ToThrift(&instance_status.profile);
-    instance_status.__isset.profile = true;
-
-    // Only send updates to insert status if fragment is finished, the coordinator waits
-    // until query execution is done to use them anyhow.
-    RuntimeState* state = fis->runtime_state();
-    if (done && state->dml_exec_state()->ToThrift(&params.insert_exec_status)) {
-      params.__isset.insert_exec_status = true;
-    }
-    // Send new errors to coordinator
-    state->GetUnreportedErrors(&params.error_log);
-    params.__isset.error_log = (params.error_log.size() > 0);
-  }
+  // Serialize the runtime profile with Thrift to 'profile_buf'. Note that the
+  // serialization output is owned by 'serializer' so this must be alive until RPC
+  // is done.
+  ThriftSerializer serializer(true);
+  uint8_t* profile_buf = nullptr;
+  uint32_t profile_len = 0;
+  ConstructReport(done, status, fis, &report, &serializer, &profile_buf, &profile_len);
 
-  Status rpc_status;
-  TReportExecStatusResult res;
-  DCHECK_EQ(res.status.status_code, TErrorCode::OK);
   // Try to send the RPC 3 times before failing. Sleep for 100ms between retries.
   // It's safe to retry the RPC as the coordinator handles duplicate RPC messages.
-  Status client_status;
+  Status rpc_status;
+  Status result_status;
   for (int i = 0; i < 3; ++i) {
-    ImpalaBackendConnection client(ExecEnv::GetInstance()->impalad_client_cache(),
-        query_ctx().coord_address, &client_status);
-    if (client_status.ok()) {
-      rpc_status = client.DoRpc(&ImpalaBackendClient::ReportExecStatus, params, &res);
-      if (rpc_status.ok()) break;
+    RpcController rpc_controller;
+
+    // The profile is a thrift structure serialized to a string and sent as a sidecar.
+    // We keep the runtime profile as Thrift object as Impala client still communicates
+    // with Impala server with Thrift RPC.
+    //
+    // Note that the sidecar is created with faststring so the ownership of the Thrift
+    // profile buffer is transferred to RPC layer and it is freed after the RPC payload
+    // is sent. If serialization of the profile to RPC sidecar fails, we will proceed
+    // without the profile so that the coordinator can still get the status instead of
+    // hitting IMPALA-2990.
+    if (profile_buf != nullptr) {
+      unique_ptr<kudu::faststring> sidecar_buf = make_unique<kudu::faststring>();
+      sidecar_buf->assign_copy(profile_buf, profile_len);
+      unique_ptr<RpcSidecar> sidecar = RpcSidecar::FromFaststring(move(sidecar_buf));
+
+      int sidecar_idx;
+      kudu::Status sidecar_status =
+          rpc_controller.AddOutboundSidecar(move(sidecar), &sidecar_idx);
+      if (LIKELY(sidecar_status.ok())) {
+        report.set_thrift_profiles_sidecar_idx(sidecar_idx);
+      } else {
+        LOG(DFATAL) <<
+            FromKuduStatus(sidecar_status, "Failed to add sidecar").GetDetail();
+      }
     }
+
+    rpc_controller.set_timeout(
+        MonoDelta::FromMilliseconds(FLAGS_backend_client_rpc_timeout_ms));
+    ReportExecStatusResponsePB resp;
+    rpc_status = FromKuduStatus(proxy_->ReportExecStatus(report, &resp, &rpc_controller),
+        "ReportExecStatus() RPC failed");
+    result_status = Status(resp.status());
+    if (rpc_status.ok()) break;
+    //TODO: Consider exponential backoff.
     if (i < 2) SleepForMs(FLAGS_report_status_retry_interval_ms);
+    LOG(WARNING) <<
+        Substitute("Retrying ReportExecStatus() RPC for query $0", PrintId(query_id()));
   }
-  Status result_status(res.status);
-  if ((!client_status.ok() || !rpc_status.ok() || !result_status.ok()) &&
-      instances_started) {
+
+  if ((!rpc_status.ok() || !result_status.ok()) && instances_started) {
     // TODO: should we try to keep rpc_status for the final report? (but the final
     // report, following this Cancel(), may not succeed anyway.)
     // TODO: not keeping an error status here means that all instances might
@@ -308,11 +386,7 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
     // TODO: Fix IMPALA-2990. Cancelling fragment instances without sending the
     // ReporExecStatus RPC may cause query to hang as the coordinator may not be aware
     // of the cancellation. Remove the log statements once IMPALA-2990 is fixed.
-    if (!client_status.ok()) {
-      LOG(ERROR) << "Cancelling fragment instances due to failure to obtain a connection "
-                 << "to the coordinator. (" << client_status.GetDetail()
-                 << "). Query " << PrintId(query_id()) << " may hang. See IMPALA-2990.";
-    } else if (!rpc_status.ok()) {
+    if (!rpc_status.ok()) {
       LOG(ERROR) << "Cancelling fragment instances due to failure to reach the "
                  << "coordinator. (" << rpc_status.GetDetail()
                  << "). Query " << PrintId(query_id()) << " may hang. See IMPALA-2990.";
@@ -343,7 +417,7 @@ Status QueryState::WaitForFinish() {
 
 void QueryState::StartFInstances() {
   VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id())
-          << " #instances=" << rpc_params_.fragment_instance_ctxs.size();
+          << " #instances=" << exec_rpc_params_.fragment_instance_ctxs.size();
   DCHECK_GT(refcnt_.Load(), 0);
   DCHECK_GT(backend_resource_refcnt_.Load(), 0) << "Should have been taken in Init()";
 
@@ -364,17 +438,18 @@ void QueryState::StartFInstances() {
           << "\n" << desc_tbl_->DebugString();
 
   Status thread_create_status;
-  DCHECK_GT(rpc_params_.fragment_ctxs.size(), 0);
-  TPlanFragmentCtx* fragment_ctx = &rpc_params_.fragment_ctxs[0];
+  DCHECK_GT(exec_rpc_params_.fragment_ctxs.size(), 0);
+  TPlanFragmentCtx* fragment_ctx = &exec_rpc_params_.fragment_ctxs[0];
   int fragment_ctx_idx = 0;
-  int num_unstarted_instances = rpc_params_.fragment_instance_ctxs.size();
+  int num_unstarted_instances = exec_rpc_params_.fragment_instance_ctxs.size();
   fragment_events_start_time_ = MonotonicStopWatch::Now();
-  for (const TPlanFragmentInstanceCtx& instance_ctx: rpc_params_.fragment_instance_ctxs) {
+  for (const TPlanFragmentInstanceCtx& instance_ctx :
+           exec_rpc_params_.fragment_instance_ctxs) {
     // determine corresponding TPlanFragmentCtx
     if (fragment_ctx->fragment.idx != instance_ctx.fragment_idx) {
       ++fragment_ctx_idx;
-      DCHECK_LT(fragment_ctx_idx, rpc_params_.fragment_ctxs.size());
-      fragment_ctx = &rpc_params_.fragment_ctxs[fragment_ctx_idx];
+      DCHECK_LT(fragment_ctx_idx, exec_rpc_params_.fragment_ctxs.size());
+      fragment_ctx = &exec_rpc_params_.fragment_ctxs[fragment_ctx_idx];
       // we expect fragment and instance contexts to follow the same order
       DCHECK_EQ(fragment_ctx->fragment.idx, instance_ctx.fragment_idx);
     }
@@ -472,7 +547,7 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
   VLOG_QUERY << "Executing instance. instance_id=" << PrintId(fis->instance_id())
       << " fragment_idx=" << fis->instance_ctx().fragment_idx
       << " per_fragment_instance_idx=" << fis->instance_ctx().per_fragment_instance_idx
-      << " coord_state_idx=" << rpc_params().coord_state_idx
+      << " coord_state_idx=" << exec_rpc_params().coord_state_idx
       << " #in-flight="
       << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue();
   Status status = fis->Exec();

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 9810156..da984b2 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -33,12 +33,15 @@
 
 namespace impala {
 
+class ControlServiceProxy;
 class FragmentInstanceState;
 class InitialReservations;
 class MemTracker;
+class ReportExecStatusRequestPB;
 class ReservationTracker;
 class RuntimeState;
 class ScannerMemLimiter;
+class ThriftSerializer;
 
 /// Central class for all backend execution state (example: the FragmentInstanceStates
 /// of the individual fragment instances) created for a particular query.
@@ -126,7 +129,7 @@ class QueryState {
 
   /// The following getters are only valid after Init().
   ScannerMemLimiter* scanner_mem_limiter() const { return scanner_mem_limiter_; }
-  const TExecQueryFInstancesParams& rpc_params() const { return rpc_params_; }
+  const TExecQueryFInstancesParams& exec_rpc_params() const { return exec_rpc_params_; }
 
   /// The following getters are only valid after Init() and should be called only from
   /// the backend execution (ie. not the coordinator side, since they require holding
@@ -162,7 +165,7 @@ class QueryState {
   ///
   /// Uses few cycles and never blocks. Not idempotent, not thread-safe.
   /// The remaining public functions must be called only after Init().
-  Status Init(const TExecQueryFInstancesParams& rpc_params) WARN_UNUSED_RESULT;
+  Status Init(const TExecQueryFInstancesParams& exec_rpc_params) WARN_UNUSED_RESULT;
 
   /// Performs the runtime-intensive parts of initial setup and starts all fragment
   /// instances belonging to this query. Each instance receives its own execution
@@ -318,10 +321,14 @@ class QueryState {
   /// the top-level MemTracker for this query (owned by obj_pool_), created in c'tor
   MemTracker* query_mem_tracker_ = nullptr;
 
-  /// set in Prepare(); rpc_params_.query_ctx is *not* set to avoid duplication
-  /// with query_ctx_
+  /// The RPC proxy used when reporting status of fragment instances to coordinator.
+  /// Set in Init().
+  std::unique_ptr<ControlServiceProxy> proxy_;
+
+  /// Set in Init(); exec_rpc_params_.query_ctx is *not* set to avoid duplication
+  /// with query_ctx_.
   /// TODO: find a way not to have to copy this
-  TExecQueryFInstancesParams rpc_params_;
+  TExecQueryFInstancesParams exec_rpc_params_;
 
   /// Buffer reservation for this query (owned by obj_pool_). Set in Init().
   ReservationTracker* buffer_reservation_ = nullptr;
@@ -408,6 +415,13 @@ class QueryState {
   /// thread-safe.
   void ReleaseBackendResources();
 
+  /// Helper for ReportExecStatus() to construct a status report to be sent to the
+  /// coordinator. If 'fis' is not NULL, the runtime profile is serialized by the Thrift
+  /// serializer 'serializer' and stored in 'profile_buf'.
+  void ConstructReport(bool done, const Status& status,
+      FragmentInstanceState* fis, ReportExecStatusRequestPB* report,
+      ThriftSerializer* serializer, uint8_t** profile_buf, uint32_t* len);
+
   /// Same behavior as ReportExecStatus().
   /// Cancel on error only if instances_started is true.
   void ReportExecStatusAux(bool done, const Status& status, FragmentInstanceState* fis,

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 127abb4..9681aac 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -186,11 +186,6 @@ string RuntimeState::ErrorLog() {
   return PrintErrorMapToString(error_log_);
 }
 
-void RuntimeState::GetErrors(ErrorLogMap* errors) {
-  lock_guard<SpinLock> l(error_log_lock_);
-  *errors = error_log_;
-}
-
 bool RuntimeState::LogError(const ErrorMsg& message, int vlog_level) {
   lock_guard<SpinLock> l(error_log_lock_);
   // All errors go to the log, unreported_error_count_ is counted independently of the
@@ -204,9 +199,12 @@ bool RuntimeState::LogError(const ErrorMsg& message, int vlog_level) {
   return false;
 }
 
-void RuntimeState::GetUnreportedErrors(ErrorLogMap* new_errors) {
+void RuntimeState::GetUnreportedErrors(ErrorLogMapPB* new_errors) {
+  new_errors->clear();
   lock_guard<SpinLock> l(error_log_lock_);
-  *new_errors = error_log_;
+  for (const ErrorLogMap::value_type& v : error_log_) {
+    (*new_errors)[v.first] = v.second;
+  }
   // Reset all messages, but keep all already reported keys so that we do not report the
   // same errors multiple times.
   ClearErrorMap(error_log_);

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 6ca9574..66985a4 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -27,6 +27,7 @@
 #include "common/global-types.h"  // for PlanNodeId
 #include "runtime/client-cache-types.h"
 #include "runtime/dml-exec-state.h"
+#include "util/error-util-internal.h"
 #include "util/runtime-profile.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 
@@ -212,12 +213,10 @@ class RuntimeState {
   /// Returns the error log lines as a string joined with '\n'.
   std::string ErrorLog();
 
-  /// Copy error_log_ to *errors
-  void GetErrors(ErrorLogMap* errors);
-
-  /// Append all accumulated errors since the last call to this function to new_errors to
-  /// be sent back to the coordinator
-  void GetUnreportedErrors(ErrorLogMap* new_errors);
+  /// Clear 'new_errors' and append all accumulated errors since the last call to this
+  /// function to 'new_errors' to be sent back to the coordinator. This has the side
+  /// effect of clearing out the internal error log map once this function returns.
+  void GetUnreportedErrors(ErrorLogMapPB* new_errors);
 
   /// Given an error message, determine whether execution should be aborted and, if so,
   /// return the corresponding error status. Otherwise, log the error and return

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index ef5e978..1208c35 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -20,17 +20,22 @@
 #include <limits>
 #include <memory>
 
+#include "gutil/strings/substitute.h"
+#include "rpc/rpc-mgr.h"
 #include "runtime/query-exec-mgr.h"
 #include "runtime/tmp-file-mgr.h"
 #include "runtime/query-state.h"
+#include "service/control-service.h"
 #include "util/disk-info.h"
 #include "util/impalad-metrics.h"
-#include "gutil/strings/substitute.h"
+
 #include "common/names.h"
 
 using boost::scoped_ptr;
 using std::numeric_limits;
 
+DECLARE_string(hostname);
+
 namespace impala {
 
 scoped_ptr<MetricGroup> TestEnv::static_metrics_;
@@ -50,7 +55,6 @@ Status TestEnv::Init() {
   // Populate the ExecEnv state that the backend tests need.
   exec_env_->mem_tracker_.reset(new MemTracker(-1, "Process"));
   RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init());
-  exec_env_->metrics_.reset(new MetricGroup("test-env-metrics"));
   exec_env_->tmp_file_mgr_.reset(new TmpFileMgr);
   if (have_tmp_file_mgr_args_) {
     RETURN_IF_ERROR(
@@ -60,6 +64,15 @@ Status TestEnv::Init() {
   }
   exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_,
       static_cast<int64_t>(0.1 * buffer_pool_capacity_));
+
+  // Initialize RpcMgr and control service.
+  IpAddr ip_address;
+  RETURN_IF_ERROR(HostnameToIpAddr(FLAGS_hostname, &ip_address));
+  exec_env_->krpc_address_.__set_hostname(ip_address);
+  RETURN_IF_ERROR(exec_env_->rpc_mgr_->Init());
+  exec_env_->control_svc_.reset(new ControlService(exec_env_->rpc_metrics_));
+  RETURN_IF_ERROR(exec_env_->control_svc_->Init());
+
   return Status::OK();
 }
 
@@ -113,6 +126,8 @@ Status TestEnv::CreateQueryState(
   query_ctx.query_id.hi = 0;
   query_ctx.query_id.lo = query_id;
   query_ctx.request_pool = "test-pool";
+  query_ctx.coord_address = exec_env_->configured_backend_address_;
+  query_ctx.coord_krpc_address = exec_env_->krpc_address_;
   TQueryOptions* query_options_to_use = &query_ctx.client_request.query_options;
   int64_t mem_limit =
       query_options_to_use->__isset.mem_limit && query_options_to_use->mem_limit > 0 ?
@@ -133,7 +148,8 @@ Status TestEnv::CreateQueryState(
       vector<TPlanFragmentInstanceCtx>({TPlanFragmentInstanceCtx()}));
   RETURN_IF_ERROR(qs->Init(rpc_params));
   FragmentInstanceState* fis = qs->obj_pool()->Add(
-      new FragmentInstanceState(qs, qs->rpc_params().fragment_ctxs[0], qs->rpc_params().fragment_instance_ctxs[0]));
+      new FragmentInstanceState(qs, qs->exec_rpc_params().fragment_ctxs[0],
+          qs->exec_rpc_params().fragment_instance_ctxs[0]));
   RuntimeState* rs = qs->obj_pool()->Add(
       new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get()));
   runtime_states_.push_back(rs);

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 000429f..130d0f1 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -910,7 +910,7 @@ void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
     topic_delta.topic_entries.push_back(TTopicItem());
     TTopicItem& topic_item = topic_delta.topic_entries.back();
     topic_item.key = MakePoolTopicKey(pool_name, host_id_);
-    Status status = thrift_serializer_.Serialize(&stats->local_stats(),
+    Status status = thrift_serializer_.SerializeToString(&stats->local_stats(),
         &topic_item.value);
     if (!status.ok()) {
       LOG(WARNING) << "Failed to serialize query pool stats: " << status.GetDetail();

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 56cf813..30c3ad1 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -597,7 +597,7 @@ void SchedulerWrapper::AddHostToTopicDelta(const Host& host, TTopicDelta* delta)
   TTopicItem item;
   item.key = host.ip;
   ThriftSerializer serializer(false);
-  Status status = serializer.Serialize(&be_desc, &item.value);
+  Status status = serializer.SerializeToString(&be_desc, &item.value);
   DCHECK(status.ok());
 
   // Add to topic delta.

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/service/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index 71704c8..67b0548 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -22,12 +22,15 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/service")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/service")
 
 # Mark the protobuf file as generated
-set_source_files_properties(${DATA_STREAM_SVC_PROTO_SRCS} PROPERTIES GENERATED TRUE)
+set_source_files_properties(${CONTROL_SERVICE_PROTO_SRCS} PROPERTIES GENERATED TRUE)
+set_source_files_properties(${DATA_STREAM_SERVICE_PROTO_SRCS} PROPERTIES GENERATED TRUE)
 
 add_library(Service
   child-query.cc
   client-request-state.cc
-  ${DATA_STREAM_SVC_PROTO_SRCS}
+  ${CONTROL_SERVICE_PROTO_SRCS}
+  control-service.cc
+  ${DATA_STREAM_SERVICE_PROTO_SRCS}
   data-stream-service.cc
   frontend.cc
   fe-support.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index c71429a..e80e3ae 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1249,9 +1249,9 @@ beeswax::QueryState::type ClientRequestState::BeeswaxQueryState() const {
 // to call concurrently with Coordinator::Exec(). See comments for 'coord_' and
 // 'coord_exec_called_' for more details.
 Status ClientRequestState::UpdateBackendExecStatus(
-    const TReportExecStatusParams& params) {
+    const ReportExecStatusRequestPB& request, const TRuntimeProfileTree& thrift_profile) {
   DCHECK(coord_.get());
-  return coord_->UpdateBackendExecStatus(params);
+  return coord_->UpdateBackendExecStatus(request, thrift_profile);
 }
 
 void ClientRequestState::UpdateFilter(const TUpdateFilterParams& params) {

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 7ff5285..5bb1839 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -38,14 +38,16 @@
 
 namespace impala {
 
-class ExecEnv;
+class ClientRequestStateCleaner;
 class Coordinator;
-class RuntimeState;
-class RowBatch;
+class ExecEnv;
 class Expr;
-class TupleRow;
 class Frontend;
-class ClientRequestStateCleaner;
+class ReportExecStatusRequestPB;
+class RowBatch;
+class RuntimeState;
+class TRuntimeProfileTree;
+class TupleRow;
 enum class AdmissionOutcome;
 
 /// Execution state of the client-facing side a query. This captures everything
@@ -158,7 +160,8 @@ class ClientRequestState {
   /// coordinator even before it becomes accessible through GetCoordinator(). These
   /// methods should be used instead of calling them directly using the coordinator
   /// object.
-  Status UpdateBackendExecStatus(const TReportExecStatusParams& params);
+  Status UpdateBackendExecStatus(const ReportExecStatusRequestPB& request,
+      const TRuntimeProfileTree& thrift_profile) WARN_UNUSED_RESULT;
   void UpdateFilter(const TUpdateFilterParams& params);
 
   ImpalaServer::SessionState* session() const { return session_.get(); }

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/service/control-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc
new file mode 100644
index 0000000..ca2564b
--- /dev/null
+++ b/be/src/service/control-service.cc
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/control-service.h"
+
+#include "common/constant-strings.h"
+#include "exec/kudu-util.h"
+#include "kudu/rpc/rpc_context.h"
+#include "rpc/rpc-mgr.h"
+#include "rpc/rpc-mgr.inline.h"
+#include "runtime/coordinator.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
+#include "service/client-request-state.h"
+#include "service/impala-server.h"
+#include "testutil/fault-injection-util.h"
+#include "util/debug-util.h"
+#include "util/memory-metrics.h"
+#include "util/parse-util.h"
+#include "util/uid-util.h"
+
+#include "gen-cpp/control_service.pb.h"
+#include "gen-cpp/control_service.proxy.h"
+#include "gen-cpp/RuntimeProfile_types.h"
+
+#include "common/names.h"
+
+using kudu::rpc::RpcContext;
+
+static const string QUEUE_LIMIT_MSG = "(Advanced) Limit on RPC payloads consumption for "
+    "ControlService. " + Substitute(MEM_UNITS_HELP_MSG, "the process memory limit");
+DEFINE_string(control_service_queue_mem_limit, "50MB", QUEUE_LIMIT_MSG.c_str());
+DEFINE_int32(control_service_num_svc_threads, 0, "Number of threads for processing "
+    "control service's RPCs. if left at default value 0, it will be set to number of "
+    "CPU cores. Set it to a positive value to change from the default.");
+
+namespace impala {
+
+ControlService::ControlService(MetricGroup* metric_group)
+  : ControlServiceIf(ExecEnv::GetInstance()->rpc_mgr()->metric_entity(),
+        ExecEnv::GetInstance()->rpc_mgr()->result_tracker()) {
+  MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
+  bool is_percent; // not used
+  int64_t bytes_limit = ParseUtil::ParseMemSpec(FLAGS_control_service_queue_mem_limit,
+      &is_percent, process_mem_tracker->limit());
+  if (bytes_limit <= 0) {
+    CLEAN_EXIT_WITH_ERROR(Substitute("Invalid mem limit for control service queue: "
+        "'$0'.", FLAGS_control_service_queue_mem_limit));
+  }
+  mem_tracker_.reset(new MemTracker(
+      bytes_limit, "Control Service Queue", process_mem_tracker));
+  MemTrackerMetric::CreateMetrics(metric_group, mem_tracker_.get(), "ControlService");
+}
+
+Status ControlService::Init() {
+  int num_svc_threads = FLAGS_control_service_num_svc_threads > 0 ?
+      FLAGS_control_service_num_svc_threads : CpuInfo::num_cores();
+  // The maximum queue length is set to maximum 32-bit value. Its actual capacity is
+  // bound by memory consumption against 'mem_tracker_'.
+  RETURN_IF_ERROR(ExecEnv::GetInstance()->rpc_mgr()->RegisterService(num_svc_threads,
+      std::numeric_limits<int32_t>::max(), this, mem_tracker_.get()));
+  return Status::OK();
+}
+
+Status ControlService::GetProxy(const TNetworkAddress& address, const string& hostname,
+    unique_ptr<ControlServiceProxy>* proxy) {
+  // Create a ControlService proxy to the destination.
+  return ExecEnv::GetInstance()->rpc_mgr()->GetProxy(address, hostname, proxy);
+}
+
+bool ControlService::Authorize(const google::protobuf::Message* req,
+    google::protobuf::Message* resp, RpcContext* context) {
+  return ExecEnv::GetInstance()->rpc_mgr()->Authorize("ControlService", context,
+      mem_tracker_.get());
+}
+
+Status ControlService::GetProfile(const ReportExecStatusRequestPB& request,
+    const ClientRequestState& request_state, kudu::rpc::RpcContext* rpc_context,
+    TRuntimeProfileTree* thrift_profile) {
+  // Debug action to simulate deserialization failure.
+  RETURN_IF_ERROR(DebugAction(request_state.query_options(),
+      "REPORT_EXEC_STATUS_PROFILE"));
+  kudu::Slice thrift_profile_slice;
+  KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar(
+      request.thrift_profiles_sidecar_idx(), &thrift_profile_slice),
+      "Failed to get thrift profile sidecar");
+  uint32_t len = thrift_profile_slice.size();
+  RETURN_IF_ERROR(DeserializeThriftMsg(thrift_profile_slice.data(),
+      &len, true, thrift_profile));
+  return Status::OK();
+}
+
+void ControlService::ReportExecStatus(const ReportExecStatusRequestPB* request,
+    ReportExecStatusResponsePB* response, kudu::rpc::RpcContext* rpc_context) {
+  const TUniqueId query_id = ProtoToQueryId(request->query_id());
+  shared_ptr<ClientRequestState> request_state =
+      ExecEnv::GetInstance()->impala_server()->GetClientRequestState(query_id);
+
+  if (request_state.get() == nullptr) {
+    // This is expected occasionally (since a report RPC might be in flight while
+    // cancellation is happening). Return an error to the caller to get it to stop.
+    const string& err = Substitute("ReportExecStatus(): Received report for unknown "
+        "query ID (probably closed or cancelled): $0", PrintId(query_id));
+    VLOG(1) << err;
+    RespondAndReleaseRpc(Status::Expected(err), response, rpc_context);
+    return;
+  }
+
+  // This failpoint is to allow jitter to be injected.
+  DebugActionNoFail(request_state->query_options(), "REPORT_EXEC_STATUS_DELAY");
+
+  // The runtime profile is sent as a Thrift serialized buffer via sidecar. Get the
+  // sidecar and deserialize the thrift profile if there is any. The sender may have
+  // failed to serialize the Thrift profile so an empty thrift profile is valid.
+  // TODO: Fix IMPALA-7232 to indicate incomplete profile in this case.
+  TRuntimeProfileTree thrift_profile;
+  if (LIKELY(request->has_thrift_profiles_sidecar_idx())) {
+    const Status& profile_status =
+        GetProfile(*request, *request_state.get(), rpc_context, &thrift_profile);
+    if (UNLIKELY(!profile_status.ok())) {
+      LOG(ERROR) << Substitute("ReportExecStatus(): Failed to deserialize profile "
+          "for query ID $0: $1", PrintId(request_state->query_id()),
+          profile_status.GetDetail());
+      // Do not expose a partially deserialized profile.
+      TRuntimeProfileTree empty_profile;
+      swap(thrift_profile, empty_profile);
+    }
+  }
+
+  Status resp_status = request_state->UpdateBackendExecStatus(*request, thrift_profile);
+  RespondAndReleaseRpc(resp_status, response, rpc_context);
+}
+
+template<typename ResponsePBType>
+void ControlService::RespondAndReleaseRpc(const Status& status, ResponsePBType* response,
+    kudu::rpc::RpcContext* rpc_context) {
+  status.ToProto(response->mutable_status());
+  // Release the memory against the control service's memory tracker.
+  mem_tracker_->Release(rpc_context->GetTransferSize());
+  rpc_context->RespondSuccess();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/service/control-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/control-service.h b/be/src/service/control-service.h
new file mode 100644
index 0000000..ca2828a
--- /dev/null
+++ b/be/src/service/control-service.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_SERVICE_CONTROL_SERVICE_H
+#define IMPALA_SERVICE_CONTROL_SERVICE_H
+
+#include "gen-cpp/control_service.service.h"
+
+#include "common/status.h"
+
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
+namespace impala {
+
+class ClientRequestState;
+class ControlServiceProxy;
+class MemTracker;
+class MetricGroup;
+class TRuntimeProfileTree;
+
+/// This singleton class implements service for managing execution of queries in Impala.
+class ControlService : public ControlServiceIf {
+  public:
+   ControlService(MetricGroup* metric_group);
+
+   /// Initializes the service by registering it with the singleton RPC manager.
+   /// This mustn't be called until RPC manager has been initialized.
+   Status Init();
+
+   /// Returns true iff the 'remote_user' in 'context' is authorized to access
+   /// ControlService. On denied access, the RPC is replied to with an error message.
+   /// Authorization is enforced only when Kerberos is enabled.
+   virtual bool Authorize(const google::protobuf::Message* req,
+       google::protobuf::Message* resp, kudu::rpc::RpcContext* rpc_context) override;
+
+   /// Updates the coordinator with the query status of the backend encoded in 'req'.
+   virtual void ReportExecStatus(const ReportExecStatusRequestPB *req,
+       ReportExecStatusResponsePB *resp, kudu::rpc::RpcContext *rpc_context) override;
+
+  /// Gets a ControlService proxy to a server with 'address' and 'hostname'.
+  /// The newly created proxy is returned in 'proxy'. Returns error status on failure.
+  static Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
+      std::unique_ptr<ControlServiceProxy>* proxy);
+
+  private:
+   /// Tracks the memory usage of payload in the service queue.
+   std::unique_ptr<MemTracker> mem_tracker_;
+
+   /// Helper for deserializing runtime profile from the sidecar attached in the inbound
+   /// call within 'rpc_context'. On success, returns the deserialized profile in
+   /// 'thrift_profile'. On failure, returns the error status;
+   static Status GetProfile(const ReportExecStatusRequestPB& request,
+       const ClientRequestState& request_state, kudu::rpc::RpcContext* rpc_context,
+       TRuntimeProfileTree* thrift_profile);
+
+   /// Helper for serializing 'status' as part of 'response'. Also releases memory
+   /// of the RPC payload previously accounted towards the internal memory tracker.
+   template<typename ResponsePBType>
+   void RespondAndReleaseRpc(const Status& status, ResponsePBType* response,
+       kudu::rpc::RpcContext* rpc_context);
+};
+
+} // namespace impala
+
+#endif // IMPALA_SERVICE_CONTROL_SERVICE_H

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 723d7ca..9dbde48 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -25,14 +25,17 @@
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/util/monotime.h"
 #include "rpc/rpc-mgr.h"
+#include "rpc/rpc-mgr.inline.h"
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "util/memory-metrics.h"
 #include "util/parse-util.h"
 #include "testutil/fault-injection-util.h"
 
 #include "gen-cpp/data_stream_service.pb.h"
+#include "gen-cpp/data_stream_service.proxy.h"
 
 #include "common/names.h"
 
@@ -40,12 +43,12 @@ using kudu::rpc::RpcContext;
 using kudu::MonoDelta;
 using kudu::MonoTime;
 
-static const string queue_limit_msg = "(Advanced) Limit on RPC payloads consumption for "
+static const string QUEUE_LIMIT_MSG = "(Advanced) Limit on RPC payloads consumption for "
     "DataStreamService. " + Substitute(MEM_UNITS_HELP_MSG, "the process memory limit");
-DEFINE_string(datastream_service_queue_mem_limit, "5%", queue_limit_msg.c_str());
+DEFINE_string(datastream_service_queue_mem_limit, "5%", QUEUE_LIMIT_MSG.c_str());
 DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of threads for processing "
     "datastream services' RPCs. If left at default value 0, it will be set to number of "
-    "CPU cores");
+    "CPU cores.  Set it to a positive value to change from the default.");
 
 namespace impala {
 
@@ -53,9 +56,13 @@ DataStreamService::DataStreamService(MetricGroup* metric_group)
   : DataStreamServiceIf(ExecEnv::GetInstance()->rpc_mgr()->metric_entity(),
         ExecEnv::GetInstance()->rpc_mgr()->result_tracker()) {
   MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
-  bool is_percent;
+  bool is_percent; // not used
   int64_t bytes_limit = ParseUtil::ParseMemSpec(FLAGS_datastream_service_queue_mem_limit,
       &is_percent, process_mem_tracker->limit());
+  if (bytes_limit <= 0) {
+    CLEAN_EXIT_WITH_ERROR(Substitute("Invalid mem limit for data stream service queue: "
+        "'$0'.", FLAGS_datastream_service_queue_mem_limit));
+  }
   mem_tracker_.reset(new MemTracker(
       bytes_limit, "Data Stream Service Queue", process_mem_tracker));
   MemTrackerMetric::CreateMetrics(metric_group, mem_tracker_.get(), "DataStreamService");
@@ -71,6 +78,12 @@ Status DataStreamService::Init() {
   return Status::OK();
 }
 
+Status DataStreamService::GetProxy(const TNetworkAddress& address, const string& hostname,
+    unique_ptr<DataStreamServiceProxy>* proxy) {
+  // Create a DataStreamService proxy to the destination.
+  return ExecEnv::GetInstance()->rpc_mgr()->GetProxy(address, hostname, proxy);
+}
+
 bool DataStreamService::Authorize(const google::protobuf::Message* req,
     google::protobuf::Message* resp, RpcContext* context) {
   return ExecEnv::GetInstance()->rpc_mgr()->Authorize("DataStreamService", context,

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/service/data-stream-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index 5fdf6dd..539974c 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -21,7 +21,6 @@
 #include "gen-cpp/data_stream_service.service.h"
 
 #include "common/status.h"
-#include "runtime/mem-tracker.h"
 
 namespace kudu {
 namespace rpc {
@@ -31,7 +30,9 @@ class RpcContext;
 
 namespace impala {
 
-class RpcMgr;
+class DataStreamServiceProxy;
+class MemTracker;
+class MetricGroup;
 
 /// This is singleton class which provides data transmission services between fragment
 /// instances. The client for this service is implemented in KrpcDataStreamSender.
@@ -75,6 +76,11 @@ class DataStreamService : public DataStreamServiceIf {
 
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
 
+  /// Gets a DataStreamService proxy to a server with 'address' and 'hostname'.
+  /// The newly created proxy is returned in 'proxy'. Returns error status on failure.
+  static Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
+      std::unique_ptr<DataStreamServiceProxy>* proxy);
+
  private:
   /// Tracks the memory usage of the payloads in the service queue.
   std::unique_ptr<MemTracker> mem_tracker_;

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/service/impala-internal-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index 3e7b222..08c708e 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -79,14 +79,6 @@ void ImpalaInternalService::CancelQueryFInstances(
   qs->Cancel();
 }
 
-void ImpalaInternalService::ReportExecStatus(TReportExecStatusResult& return_val,
-    const TReportExecStatusParams& params) {
-  FAULT_INJECTION_RPC_DELAY(RPC_REPORTEXECSTATUS);
-  DCHECK(params.__isset.query_id);
-  DCHECK(params.__isset.coord_state_idx);
-  impala_server_->ReportExecStatus(return_val, params);
-}
-
 void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
     const TUpdateFilterParams& params) {
   FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER);

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/service/impala-internal-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index 971670a..376b615 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -35,8 +35,6 @@ class ImpalaInternalService : public ImpalaInternalServiceIf {
       const TExecQueryFInstancesParams& params);
   virtual void CancelQueryFInstances(TCancelQueryFInstancesResult& return_val,
       const TCancelQueryFInstancesParams& params);
-  virtual void ReportExecStatus(TReportExecStatusResult& return_val,
-      const TReportExecStatusParams& params);
   virtual void UpdateFilter(TUpdateFilterResult& return_val,
       const TUpdateFilterParams& params);
   virtual void PublishFilter(TPublishFilterResult& return_val,

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 9fa05e9..ec39b04 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1008,11 +1008,12 @@ Status ImpalaServer::ExecuteInternal(
 }
 
 void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) {
-  PrepareQueryContext(GetThriftBackendAddress(), query_ctx);
+  PrepareQueryContext(GetThriftBackendAddress(),
+      ExecEnv::GetInstance()->krpc_address(), query_ctx);
 }
 
-void ImpalaServer::PrepareQueryContext(
-    const TNetworkAddress& backend_addr, TQueryCtx* query_ctx) {
+void ImpalaServer::PrepareQueryContext(const TNetworkAddress& backend_addr,
+    const TNetworkAddress& krpc_addr, TQueryCtx* query_ctx) {
   query_ctx->__set_pid(getpid());
   int64_t now_us = UnixMicros();
   const Timezone& utc_tz = TimezoneDatabase::GetUtcTimezone();
@@ -1031,6 +1032,7 @@ void ImpalaServer::PrepareQueryContext(
   query_ctx->__set_now_string(ToStringFromUnixMicros(now_us, *local_tz));
   query_ctx->__set_start_unix_millis(now_us / MICROS_PER_MILLI);
   query_ctx->__set_coord_address(backend_addr);
+  query_ctx->__set_coord_krpc_address(krpc_addr);
   query_ctx->__set_local_time_zone(local_tz_name);
 
   // Creating a random_generator every time is not free, but
@@ -1336,27 +1338,6 @@ Status ImpalaServer::GetSessionState(const TUniqueId& session_id,
   }
 }
 
-void ImpalaServer::ReportExecStatus(
-    TReportExecStatusResult& return_val, const TReportExecStatusParams& params) {
-  VLOG_FILE << "ReportExecStatus() coord_state_idx=" << params.coord_state_idx;
-  // TODO: implement something more efficient here, we're currently
-  // acquiring/releasing the map lock and doing a map lookup for
-  // every report (assign each query a local int32_t id and use that to index into a
-  // vector of ClientRequestStates, w/o lookup or locking?)
-  shared_ptr<ClientRequestState> request_state =
-      GetClientRequestState(params.query_id);
-  if (request_state.get() == nullptr) {
-    // This is expected occasionally (since a report RPC might be in flight while
-    // cancellation is happening). Return an error to the caller to get it to stop.
-    const string& err = Substitute("ReportExecStatus(): Received report for unknown "
-        "query ID (probably closed or cancelled): $0", PrintId(params.query_id));
-    VLOG(1) << err;
-    Status::Expected(err).SetTStatus(&return_val);
-    return;
-  }
-  request_state->UpdateBackendExecStatus(params).SetTStatus(&return_val);
-}
-
 void ImpalaServer::InitializeConfigVariables() {
   // Set idle_session_timeout here to let the SET command return the value of
   // the command line option FLAGS_idle_session_timeout
@@ -1842,7 +1823,8 @@ void ImpalaServer::AddLocalBackendToStatestore(
 
   TTopicItem& item = update.topic_entries.back();
   item.key = local_backend_id;
-  Status status = thrift_serializer_.Serialize(&local_backend_descriptor, &item.value);
+  Status status = thrift_serializer_.SerializeToString(&local_backend_descriptor,
+      &item.value);
   if (!status.ok()) {
     LOG(WARNING) << "Failed to serialize Impala backend descriptor for statestore topic:"
                  << " " << status.GetDetail();

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 1deb7c2..0f4392e 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -296,8 +296,6 @@ class ImpalaServer : public ImpalaServiceIf,
       const apache::hive::service::cli::thrift::TRenewDelegationTokenReq& req);
 
   /// ImpalaInternalService rpcs
-  void ReportExecStatus(TReportExecStatusResult& return_val,
-      const TReportExecStatusParams& params);
   void UpdateFilter(TUpdateFilterResult& return_val,
       const TUpdateFilterParams& params);
 
@@ -308,7 +306,8 @@ class ImpalaServer : public ImpalaServiceIf,
   void PrepareQueryContext(TQueryCtx* query_ctx);
 
   /// Static helper for PrepareQueryContext() that is used from expr-benchmark.
-  static void PrepareQueryContext(const TNetworkAddress& backend_addr, TQueryCtx* query_ctx);
+  static void PrepareQueryContext(const TNetworkAddress& backend_addr,
+      const TNetworkAddress& krpc_addr, TQueryCtx* query_ctx);
 
   /// SessionHandlerIf methods
 
@@ -533,6 +532,7 @@ class ImpalaServer : public ImpalaServiceIf,
  private:
   struct ExpirationEvent;
   friend class ChildQuery;
+  friend class ControlService;
   friend class ImpalaHttpHandler;
   friend struct SessionState;
   friend class ImpalaServerTest;

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 8a786e0..c09ef0b 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -45,14 +45,16 @@ using namespace impala;
 
 Status InProcessImpalaServer::StartWithEphemeralPorts(const string& statestore_host,
     int statestore_port, InProcessImpalaServer** server) {
-  // Thes flags are read directly in several places to find the address of the local
+  // These flags are read directly in several places to find the address of the local
   // backend interface.
   FLAGS_be_port = 0;
-  FLAGS_krpc_port = 0;
+  // Thrift server ctor allows port to be set to 0. Not supported with KRPC.
+  // So KRPC port must be explicitly set here.
+  FLAGS_krpc_port = FindUnusedEphemeralPort();
 
-  // Use wildcard addresses of 0 so that the servers will pick their own port.
-  *server = new InProcessImpalaServer(FLAGS_hostname, 0, 0, 0, 0, statestore_host,
-      statestore_port);
+  // Use wildcard addresses of 0 so that the Thrift servers will pick their own port.
+  *server = new InProcessImpalaServer(FLAGS_hostname, 0, FLAGS_krpc_port, 0, 0,
+      statestore_host, statestore_port);
   // Start the daemon and check if it works, if not delete the current server object and
   // pick a new set of ports
   return (*server)->StartWithClientServers(0, 0);

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/util/container-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/container-util.h b/be/src/util/container-util.h
index 5cd2d0c..41093e3 100644
--- a/be/src/util/container-util.h
+++ b/be/src/util/container-util.h
@@ -126,11 +126,11 @@ const V& FindWithDefault(const boost::unordered_map<K, V>& m, const K& key,
 
 /// Merges (by summing) the values from two maps of values. The values must be
 /// native types or support operator +=.
-template<typename K, typename V>
-void MergeMapValues(const std::map<K, V>& src, std::map<K, V>* dst) {
-  for (typename std::map<K, V>::const_iterator src_it = src.begin();
+template<typename MAP_TYPE>
+void MergeMapValues(const MAP_TYPE& src, MAP_TYPE* dst) {
+  for (typename MAP_TYPE::const_iterator src_it = src.begin();
       src_it != src.end(); ++src_it) {
-    typename std::map<K, V>::iterator dst_it = dst->find(src_it->first);
+    typename MAP_TYPE::iterator dst_it = dst->find(src_it->first);
     if (dst_it == dst->end()) {
       (*dst)[src_it->first] = src_it->second;
     } else {

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/util/error-util-internal.h
----------------------------------------------------------------------
diff --git a/be/src/util/error-util-internal.h b/be/src/util/error-util-internal.h
new file mode 100644
index 0000000..0491282
--- /dev/null
+++ b/be/src/util/error-util-internal.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_UTIL_ERROR_UTIL_INTERNAL_H
+#define IMPALA_UTIL_ERROR_UTIL_INTERNAL_H
+
+#include <google/protobuf/map.h>
+
+#include "gen-cpp/control_service.pb.h"
+#include "util/error-util.h"
+
+/// Factor out the following structures from 'error-util.h' to prevent circular dependency
+/// with code in kudu directory which is needed for generating 'control_service.pb.h'.
+namespace impala {
+
+/// Track log messages per error code. Using a map here instead of unordered_map
+/// to ensure the output from PrintErrorLogMap() is deterministic.
+typedef std::map<TErrorCode::type, ErrorLogEntryPB> ErrorLogMap;
+typedef google::protobuf::Map<int32_t, ErrorLogEntryPB> ErrorLogMapPB;
+
+/// Merge an error log entry 'entry' with 'error_code' into ErrorLogMap 'target_map'.
+/// General log messages are simply appended, specific errors are deduplicated by either
+/// appending a new instance or incrementing the count of an existing one.
+void MergeErrorLogEntry(const TErrorCode::type error_code,
+    const ErrorLogEntryPB& entry, ErrorLogMap* target_map);
+
+/// Merge error map m1 into m2. Calls MergerErrorLogEntry() internally.
+void MergeErrorMaps(const ErrorLogMap& m1, ErrorLogMap* m2);
+
+/// Merge protobuf error map m1 into m2. Calls MergeErrorLogEntry() internally.
+void MergeErrorMaps(const ErrorLogMapPB& m1, ErrorLogMap* m2);
+
+/// Append an error to the error map. Performs the aggregation as follows: GENERAL errors
+/// are appended to the list of GENERAL errors, to keep one item each in the map, while
+/// for all other error codes only the count is incremented and only the first message
+/// is kept as a sample.
+void AppendError(ErrorLogMap* map, const ErrorMsg& e);
+
+/// Helper method to print the contents of an ErrorMap to a stream.
+void PrintErrorMap(std::ostream* stream, const ErrorLogMap& errors);
+
+/// Reset all messages and count, but keep all keys to prevent sending already reported
+/// general errors and counting the same non-general error multiple times.
+void ClearErrorMap(ErrorLogMap& errors);
+
+/// Return the number of errors within this error maps. General errors are counted
+/// individually, while specific errors are counted once per distinct occurrence.
+size_t ErrorCount(const ErrorLogMap& errors);
+
+/// Generate a string representation of the error map. Produces the same output as
+/// PrintErrorMap, but returns a string instead of using a stream.
+std::string PrintErrorMapToString(const ErrorLogMap& errors);
+
+} // namespace impala
+
+#endif // IMPALA_UTIL_ERROR_UTIL_INTERNAL_H

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/util/error-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/error-util-test.cc b/be/src/util/error-util-test.cc
index 0c940cb..baba5ad 100644
--- a/be/src/util/error-util-test.cc
+++ b/be/src/util/error-util-test.cc
@@ -20,9 +20,11 @@
 #include "gen-cpp/Status_types.h"
 #include "gen-cpp/ErrorCodes_types.h"
 
-#include "error-util.h"
+#include "error-util-internal.h"
 #include "testutil/gtest-util.h"
 
+#include "common/names.h"
+
 namespace impala {
 
 TEST(ErrorMsg, GenericFormatting) {
@@ -41,68 +43,68 @@ TEST(ErrorMsg, GenericFormatting) {
 
 TEST(ErrorMsg, MergeMap) {
   ErrorLogMap left, right;
-  left[TErrorCode::GENERAL].messages.push_back("1");
+  left[TErrorCode::GENERAL].add_messages("1");
 
-  right[TErrorCode::GENERAL].messages.push_back("2");
-  right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.push_back("p");
-  right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count = 3;
+  right[TErrorCode::GENERAL].add_messages("2");
+  right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].add_messages("p");
+  right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].set_count(3);
 
   MergeErrorMaps(right, &left);
   ASSERT_EQ(2, left.size());
-  ASSERT_EQ(2, left[TErrorCode::GENERAL].messages.size());
+  ASSERT_EQ(2, left[TErrorCode::GENERAL].messages_size());
 
   right = ErrorLogMap();
-  right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.push_back("p");
-  right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count = 3;
+  right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].add_messages("p");
+  right[TErrorCode::PARQUET_MULTIPLE_BLOCKS].set_count(3);
 
   MergeErrorMaps(right, &left);
   ASSERT_EQ(2, left.size());
-  ASSERT_EQ(2, left[TErrorCode::GENERAL].messages.size());
-  ASSERT_EQ(6, left[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count);
+  ASSERT_EQ(2, left[TErrorCode::GENERAL].messages_size());
+  ASSERT_EQ(6, left[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count());
 
   ErrorLogMap dummy, cleared;
-  dummy[TErrorCode::GENERAL].messages.push_back("2");
-  dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.push_back("p");
-  dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count = 3;
+  dummy[TErrorCode::GENERAL].add_messages("2");
+  dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].add_messages("p");
+  dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].set_count(3);
   ASSERT_EQ(2, dummy.size());
-  ASSERT_EQ(3, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count);
-  ASSERT_EQ(1, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.size());
-  ASSERT_EQ(1, dummy[TErrorCode::GENERAL].messages.size());
-  cleared[TErrorCode::GENERAL].messages.push_back("1");
-  cleared[TErrorCode::RPC_RECV_TIMEOUT].messages.push_back("p");
+  ASSERT_EQ(3, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count());
+  ASSERT_EQ(1, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages_size());
+  ASSERT_EQ(1, dummy[TErrorCode::GENERAL].messages_size());
+  cleared[TErrorCode::GENERAL].add_messages("1");
+  cleared[TErrorCode::RPC_RECV_TIMEOUT].add_messages("p");
   ClearErrorMap(cleared);
   ASSERT_EQ(2, cleared.size());
   ASSERT_EQ(1, cleared.count(TErrorCode::RPC_RECV_TIMEOUT));
 
   MergeErrorMaps(cleared, &dummy);
   ASSERT_EQ(3, dummy.size());
-  ASSERT_EQ(3, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count);
-  ASSERT_EQ(1, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.size());
+  ASSERT_EQ(3, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count());
+  ASSERT_EQ(1, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages_size());
   ASSERT_EQ(1, dummy.count(TErrorCode::RPC_RECV_TIMEOUT));
-  ASSERT_EQ(0, dummy[TErrorCode::RPC_RECV_TIMEOUT].count);
-  ASSERT_EQ(0, dummy[TErrorCode::RPC_RECV_TIMEOUT].messages.size());
-  ASSERT_EQ(0, dummy[TErrorCode::GENERAL].count);
-  ASSERT_EQ(1, dummy[TErrorCode::GENERAL].messages.size());
+  ASSERT_EQ(0, dummy[TErrorCode::RPC_RECV_TIMEOUT].count());
+  ASSERT_EQ(0, dummy[TErrorCode::RPC_RECV_TIMEOUT].messages_size());
+  ASSERT_EQ(0, dummy[TErrorCode::GENERAL].count());
+  ASSERT_EQ(1, dummy[TErrorCode::GENERAL].messages_size());
 
   MergeErrorMaps(dummy, &cleared);
   ASSERT_EQ(3, cleared.size());
-  ASSERT_EQ(3, cleared[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count);
-  ASSERT_EQ(1, cleared[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.size());
+  ASSERT_EQ(3, cleared[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count());
+  ASSERT_EQ(1, cleared[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages_size());
   ASSERT_EQ(1, cleared.count(TErrorCode::RPC_RECV_TIMEOUT));
-  ASSERT_EQ(0, cleared[TErrorCode::RPC_RECV_TIMEOUT].count);
-  ASSERT_EQ(0, cleared[TErrorCode::RPC_RECV_TIMEOUT].messages.size());
-  ASSERT_EQ(0, cleared[TErrorCode::GENERAL].count);
-  ASSERT_EQ(1, cleared[TErrorCode::GENERAL].messages.size());
+  ASSERT_EQ(0, cleared[TErrorCode::RPC_RECV_TIMEOUT].count());
+  ASSERT_EQ(0, cleared[TErrorCode::RPC_RECV_TIMEOUT].messages_size());
+  ASSERT_EQ(0, cleared[TErrorCode::GENERAL].count());
+  ASSERT_EQ(1, cleared[TErrorCode::GENERAL].messages_size());
 }
 
 TEST(ErrorMsg, CountErrors) {
   ErrorLogMap m;
   ASSERT_EQ(0, ErrorCount(m));
-  m[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.push_back("p");
-  m[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count = 999;
+  m[TErrorCode::PARQUET_MULTIPLE_BLOCKS].add_messages("p");
+  m[TErrorCode::PARQUET_MULTIPLE_BLOCKS].set_count(999);
   ASSERT_EQ(1, ErrorCount(m));
-  m[TErrorCode::GENERAL].messages.push_back("1");
-  m[TErrorCode::GENERAL].messages.push_back("2");
+  m[TErrorCode::GENERAL].add_messages("1");
+  m[TErrorCode::GENERAL].add_messages("2");
   ASSERT_EQ(3, ErrorCount(m));
   ClearErrorMap(m);
   ASSERT_EQ(1, ErrorCount(m));
@@ -124,11 +126,13 @@ TEST(ErrorMsg, AppendError) {
 
 TEST(ErrorMsg, PrintMap) {
   ErrorLogMap left;
-  left[TErrorCode::GENERAL].messages.push_back("1");
-  left[TErrorCode::GENERAL].messages.push_back("2");
-  left[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.push_back("p");
-  left[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count = 999;
-  ASSERT_EQ("1\n2\np (1 of 999 similar)\n", PrintErrorMapToString(left));
+  left[TErrorCode::GENERAL].add_messages("1");
+  left[TErrorCode::GENERAL].add_messages("2");
+  left[TErrorCode::GENERAL].add_messages("3");
+  left[TErrorCode::PARQUET_MULTIPLE_BLOCKS].add_messages("p");
+  left[TErrorCode::PARQUET_MULTIPLE_BLOCKS].set_count(999);
+  const string msg = PrintErrorMapToString(left);
+  ASSERT_EQ("1\n2\n3\np (1 of 999 similar)\n", msg);
   ClearErrorMap(left);
   ASSERT_EQ("", PrintErrorMapToString(left));
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/util/error-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/error-util.cc b/be/src/util/error-util.cc
index 445833f..34f0356 100644
--- a/be/src/util/error-util.cc
+++ b/be/src/util/error-util.cc
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "util/error-util.h"
+#include "util/error-util-internal.h"
 
 #include <errno.h>
 #include <string.h>
@@ -140,20 +140,20 @@ ErrorMsg ErrorMsg::Init(TErrorCode::type error, const ArgType& arg0,
 
 void PrintErrorMap(ostream* stream, const ErrorLogMap& errors) {
   for (const ErrorLogMap::value_type& v: errors) {
-    const TErrorLogEntry& log_entry = v.second;
+    const ErrorLogEntryPB& entry = v.second;
     if (v.first == TErrorCode::GENERAL) {
-      DCHECK_EQ(log_entry.count, 0);
-      for (const string& s: log_entry.messages) {
-        *stream << s << "\n";
+      DCHECK_EQ(entry.count(), 0);
+      for (auto& msg : entry.messages()) {
+        *stream << msg << "\n";
       }
-    } else if (!log_entry.messages.empty()) {
-      DCHECK_GT(log_entry.count, 0);
-      DCHECK_EQ(log_entry.messages.size(), 1);
-      *stream << log_entry.messages.front();
-      if (log_entry.count == 1) {
+    } else if (entry.messages_size() > 0) {
+      DCHECK_GT(entry.count(), 0);
+      DCHECK_EQ(entry.messages_size(), 1);
+      *stream << *(entry.messages().begin());
+      if (entry.count() == 1) {
         *stream << "\n";
       } else {
-        *stream << " (1 of " << log_entry.count << " similar)\n";
+        *stream << " (1 of " << entry.count() << " similar)\n";
       }
     }
   }
@@ -165,48 +165,56 @@ string PrintErrorMapToString(const ErrorLogMap& errors) {
   return stream.str();
 }
 
-void MergeErrorMaps(const ErrorLogMap& m1, ErrorLogMap* m2) {
-  for (const ErrorLogMap::value_type& v: m1) {
-    TErrorLogEntry& target = (*m2)[v.first];
-    const TErrorLogEntry& source = v.second;
-    // Append generic message, append specific codes or increment count if exists
-    if (v.first == TErrorCode::GENERAL) {
-      DCHECK_EQ(v.second.count, 0);
-      target.messages.insert(
-          target.messages.end(), source.messages.begin(), source.messages.end());
-    } else {
-      DCHECK_EQ(source.messages.empty(), source.count == 0);
-      if (target.messages.empty()) {
-        target.messages = source.messages;
-      }
-      target.count += source.count;
+void MergeErrorLogEntry(TErrorCode::type error_code, const ErrorLogEntryPB& entry,
+    ErrorLogMap* target_map) {
+  ErrorLogEntryPB* target = &(*target_map)[error_code];
+
+  // Append generic message, append specific codes or increment count if exists.
+  if (error_code == TErrorCode::GENERAL || target->messages_size() == 0) {
+    for (auto& msg : entry.messages()) {
+      target->add_messages(msg);
     }
   }
+  if (error_code != TErrorCode::GENERAL) {
+    target->set_count(target->count() + entry.count());
+  }
+}
+
+void MergeErrorMaps(const ErrorLogMapPB& m1, ErrorLogMap* m2) {
+  for (const ErrorLogMapPB::value_type& v : m1) {
+    MergeErrorLogEntry(static_cast<TErrorCode::type>(v.first), v.second, m2);
+  }
+}
+
+void MergeErrorMaps(const ErrorLogMap& m1, ErrorLogMap* m2) {
+  for (const ErrorLogMap::value_type& v : m1) {
+    MergeErrorLogEntry(v.first, v.second, m2);
+  }
 }
 
 void AppendError(ErrorLogMap* map, const ErrorMsg& e) {
-  TErrorLogEntry& target = (*map)[e.error()];
+  ErrorLogEntryPB* target = &(*map)[e.error()];
   if (e.error() == TErrorCode::GENERAL) {
-    target.messages.push_back(e.msg());
+    target->add_messages(e.msg());
   } else {
-    if (target.messages.empty()) {
-      target.messages.push_back(e.msg());
+    if (target->messages_size() == 0) {
+      target->add_messages(e.msg());
     }
-    ++target.count;
+    target->set_count(target->count() + 1);
   }
 }
 
 void ClearErrorMap(ErrorLogMap& errors) {
-  for (auto iter = errors.begin(); iter != errors.end(); ++iter) {
-    iter->second.messages.clear();
-    iter->second.count = 0;
+  for (auto& err : errors) {
+    err.second.mutable_messages()->Clear();
+    err.second.set_count(0);
   }
 }
 
 size_t ErrorCount(const ErrorLogMap& errors) {
   ErrorLogMap::const_iterator cit = errors.find(TErrorCode::GENERAL);
   if (cit == errors.end()) return errors.size();
-  return errors.size() + cit->second.messages.size() - 1;
+  return errors.size() + cit->second.messages_size() - 1;
 }
 
 string ErrorMsg::GetFullMessageDetails() const {

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/util/error-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/error-util.h b/be/src/util/error-util.h
index a93e6b7..2b0e280 100644
--- a/be/src/util/error-util.h
+++ b/be/src/util/error-util.h
@@ -141,36 +141,6 @@ private:
   std::vector<std::string> details_;
 };
 
-/// Track log messages per error code.
-typedef std::map<TErrorCode::type, TErrorLogEntry> ErrorLogMap;
-
-/// Merge error map m1 into m2. Merging of error maps occurs when the errors from
-/// multiple backends are merged into a single error map.  General log messages are
-/// simply appended, specific errors are deduplicated by either appending a new
-/// instance or incrementing the count of an existing one.
-void MergeErrorMaps(const ErrorLogMap& m1, ErrorLogMap* m2);
-
-/// Append an error to the error map. Performs the aggregation as follows: GENERAL errors
-/// are appended to the list of GENERAL errors, to keep one item each in the map, while
-/// for all other error codes only the count is incremented and only the first message
-/// is kept as a sample.
-void AppendError(ErrorLogMap* map, const ErrorMsg& e);
-
-/// Helper method to print the contents of an ErrorMap to a stream.
-void PrintErrorMap(std::ostream* stream, const ErrorLogMap& errors);
-
-/// Reset all messages and count, but keep all keys to prevent sending already reported
-/// general errors and counting the same non-general error multiple times.
-void ClearErrorMap(ErrorLogMap& errors);
-
-/// Return the number of errors within this error maps. General errors are counted
-/// individually, while specific errors are counted once per distinct occurrence.
-size_t ErrorCount(const ErrorLogMap& errors);
-
-/// Generate a string representation of the error map. Produces the same output as
-/// PrintErrorMap, but returns a string instead of using a stream.
-std::string PrintErrorMapToString(const ErrorLogMap& errors);
-
 /// Maps the HS2 TStatusCode types to the corresponding TErrorCode.
 TErrorCode::type HS2TStatusCodeToTErrorCode(
     const apache::hive::service::cli::thrift::TStatusCode::type& hs2Code);

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index d7eba44..79885e5 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -250,6 +250,7 @@ void RuntimeProfile::Update(const TRuntimeProfileTree& thrift_profile) {
 }
 
 void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx) {
+  if (UNLIKELY(nodes.size()) == 0) return;
   DCHECK_LT(*idx, nodes.size());
   const TRuntimeProfileNode& node = nodes[*idx];
   {
@@ -826,7 +827,7 @@ Status RuntimeProfile::SerializeToArchiveString(stringstream* out) const {
   const_cast<RuntimeProfile*>(this)->ToThrift(&thrift_object);
   ThriftSerializer serializer(true);
   vector<uint8_t> serialized_buffer;
-  RETURN_IF_ERROR(serializer.Serialize(&thrift_object, &serialized_buffer));
+  RETURN_IF_ERROR(serializer.SerializeToVector(&thrift_object, &serialized_buffer));
 
   // Compress the serialized thrift string.  This uses string keys and is very
   // easy to compress.

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/util/uid-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h
index e20af55..03b335a 100644
--- a/be/src/util/uid-util.h
+++ b/be/src/util/uid-util.h
@@ -24,6 +24,7 @@
 #include <boost/uuid/uuid_generators.hpp>
 
 #include "gen-cpp/Types_types.h"  // for TUniqueId
+#include "gen-cpp/control_service.pb.h"
 #include "util/debug-util.h"
 
 namespace impala {
@@ -55,6 +56,12 @@ inline void UUIDToTUniqueId(const boost::uuids::uuid& uuid, TUniqueId* unique_id
   memcpy(&(unique_id->lo), &uuid.data[8], 8);
 }
 
+inline void TUniqueIdToUniqueIdPB(
+    const TUniqueId& t_unique_id, UniqueIdPB* unique_id_pb) {
+  unique_id_pb->set_lo(t_unique_id.lo);
+  unique_id_pb->set_hi(t_unique_id.hi);
+}
+
 /// Query id: uuid with bottom 4 bytes set to 0
 /// Fragment instance id: query id with instance index stored in the bottom 4 bytes
 
@@ -68,6 +75,14 @@ inline TUniqueId UuidToQueryId(const boost::uuids::uuid& uuid) {
   return result;
 }
 
+inline TUniqueId ProtoToQueryId(const UniqueIdPB& uid_pb) {
+  DCHECK(uid_pb.IsInitialized());
+  TUniqueId result;
+  result.hi = uid_pb.hi();
+  result.lo = uid_pb.lo();
+  return result;
+}
+
 inline TUniqueId GetQueryId(const TUniqueId& fragment_instance_id) {
   TUniqueId result = fragment_instance_id;
   result.lo &= ~FRAGMENT_IDX_MASK;  // zero out bottom 4 bytes
@@ -78,6 +93,10 @@ inline int32_t GetInstanceIdx(const TUniqueId& fragment_instance_id) {
   return fragment_instance_id.lo & FRAGMENT_IDX_MASK;
 }
 
+inline int32_t GetInstanceIdx(const UniqueIdPB& fragment_instance_id) {
+  return fragment_instance_id.lo() & FRAGMENT_IDX_MASK;
+}
+
 inline bool IsValidFInstanceId(const TUniqueId& fragment_instance_id) {
   return fragment_instance_id.hi != 0L;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/common/protobuf/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/common/protobuf/CMakeLists.txt b/common/protobuf/CMakeLists.txt
index 2faec7e..69a30a3 100644
--- a/common/protobuf/CMakeLists.txt
+++ b/common/protobuf/CMakeLists.txt
@@ -18,38 +18,38 @@
 
 cmake_minimum_required(VERSION 2.6)
 
-set(PROTOBUF_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp/)
-
-PROTOBUF_GENERATE_CPP(
-  COMMON_PROTO_SRCS COMMON_PROTO_HDRS COMMON_PROTO_TGTS
-  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}
-  BINARY_ROOT ${PROTOBUF_OUTPUT_DIR}
-  PROTO_FILES common.proto)
-add_custom_target(common_proto DEPENDS ${COMMON_PROTO_TGTS})
-set(COMMON_PROTO_SRCS ${COMMON_PROTO_SRCS} PARENT_SCOPE)
-
-PROTOBUF_GENERATE_CPP(
-  ROW_BATCH_PROTO_SRCS ROW_BATCH_PROTO_HDRS ROW_BATCH_PROTO_TGTS
-  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}
-  BINARY_ROOT ${PROTOBUF_OUTPUT_DIR}
-  PROTO_FILES row_batch.proto)
-add_custom_target(row_batch_proto DEPENDS ${ROW_BATCH_PROTO_TGTS})
-set(ROW_BATCH_PROTO_SRCS ${ROW_BATCH_PROTO_SRCS} PARENT_SCOPE)
+add_custom_target(proto-deps)
 
-KRPC_GENERATE(DATA_STREAM_SVC_PROTO_SRCS DATA_STREAM_SVC_PROTO_HDRS
-  DATA_STREAM_SVC_PROTO_TGTS
-  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}
-  BINARY_ROOT ${PROTOBUF_OUTPUT_DIR}
-  PROTO_FILES data_stream_service.proto)
-add_custom_target(data_stream_svc_proto DEPENDS ${DATA_STREAM_SVC_PROTO_TGTS})
-set(DATA_STREAM_SVC_PROTO_SRCS ${DATA_STREAM_SVC_PROTO_SRCS} PARENT_SCOPE)
-
-KRPC_GENERATE(RPC_TEST_PROTO_SRCS RPC_TEST_PROTO_HDRS
-  RPC_TEST_PROTO_TGTS
-  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}
-  BINARY_ROOT ${PROTOBUF_OUTPUT_DIR}
-  PROTO_FILES rpc_test.proto)
-add_custom_target(rpc_test_proto_tgt DEPENDS ${RPC_TEST_PROTO_TGTS})
-set(RPC_TEST_PROTO_SRCS ${RPC_TEST_PROTO_SRCS} PARENT_SCOPE)
+set(PROTOBUF_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp/)
 
-add_custom_target(proto-deps DEPENDS token_proto rpc_header_proto common_proto row_batch_proto data_stream_svc_proto)
+foreach(pb_src common row_batch)
+   string(TOUPPER ${pb_src} _PB_SRC_UPPER)
+   set(_PROTO_SRCS ${_PB_SRC_UPPER}_PROTO_SRCS)
+   set(_PROTO_HDRS ${_PB_SRC_UPPER}_PROTO_HDRS)
+   set(_PROTO_TGTS ${_PB_SRC_UPPER}_PROTO_TGTS)
+   set(_INPUT_SRC ${pb_src}.proto)
+   PROTOBUF_GENERATE_CPP(
+      ${_PROTO_SRCS} ${_PROTO_HDRS} ${_PROTO_TGTS}
+      SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}
+      BINARY_ROOT ${PROTOBUF_OUTPUT_DIR}
+      PROTO_FILES ${_INPUT_SRC})
+   add_custom_target(${pb_src}_proto DEPENDS ${${_PROTO_TGTS}})
+   add_dependencies(proto-deps ${pb_src}_proto)
+   set(${_PROTO_SRCS} ${${_PROTO_SRCS}} PARENT_SCOPE)
+endforeach()
+
+foreach(pb_src data_stream_service control_service rpc_test)
+   string(TOUPPER ${pb_src} _PB_SRC_UPPER)
+   set(_PROTO_SRCS ${_PB_SRC_UPPER}_PROTO_SRCS)
+   set(_PROTO_HDRS ${_PB_SRC_UPPER}_PROTO_HDRS)
+   set(_PROTO_TGTS ${_PB_SRC_UPPER}_PROTO_TGTS)
+   set(_INPUT_SRC ${pb_src}.proto)
+   KRPC_GENERATE(
+      ${_PROTO_SRCS} ${_PROTO_HDRS} ${_PROTO_TGTS}
+      SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}
+      BINARY_ROOT ${PROTOBUF_OUTPUT_DIR}
+      PROTO_FILES ${_INPUT_SRC})
+   add_custom_target(${pb_src}_proto DEPENDS ${${_PROTO_TGTS}})
+   add_dependencies(proto-deps ${pb_src}_proto)
+   set(${_PROTO_SRCS} ${${_PROTO_SRCS}} PARENT_SCOPE)
+endforeach()

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/common/protobuf/common.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/common.proto b/common/protobuf/common.proto
index 17f9fc6..6c265a3 100644
--- a/common/protobuf/common.proto
+++ b/common/protobuf/common.proto
@@ -17,6 +17,8 @@
 
 // Common protobuf definitions.
 
+syntax="proto2";
+
 package impala;
 
 // Proto-serialized version of Impala's Status object.
@@ -27,8 +29,8 @@ message StatusPB {
 
 // 128-bit ID (equivalent to TUniqueID).
 message UniqueIdPB {
-  optional int64 hi = 1;
-  optional int64 lo = 2;
+  required fixed64 hi = 1;
+  required fixed64 lo = 2;
 }
 
 // The compression codec. Currently used in row batch's header to
@@ -36,4 +38,4 @@ message UniqueIdPB {
 enum CompressionType {
   NONE = 0; // No compression.
   LZ4 = 1;
-};
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/common/protobuf/control_service.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
new file mode 100644
index 0000000..cf64c11
--- /dev/null
+++ b/common/protobuf/control_service.proto
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+syntax="proto2";
+
+package impala;
+
+import "common.proto";
+
+import "kudu/rpc/rpc_header.proto";
+
+message ParquetDmlStatsPB {
+  // For each column, the on disk byte size
+  map<string, int64> per_column_size = 1;
+}
+
+message KuduDmlStatsPB {
+  // The number of reported per-row errors, i.e. this many rows were not modified.
+  // Note that this aggregate is less useful than a breakdown of the number of errors by
+  // error type, e.g. number of rows with duplicate key conflicts, number of rows
+  // violating nullability constraints, etc., but it isn't possible yet to differentiate
+  // all error types in the KuduTableSink yet.
+  optional int64 num_row_errors = 1;
+}
+
+// ReportExecStatus
+
+// Per partition DML stats
+// TODO: this should include the table stats that we update the metastore with.
+message DmlStatsPB {
+  optional int64 bytes_written = 1;
+
+  optional ParquetDmlStatsPB parquet_stats = 2;
+
+  optional KuduDmlStatsPB kudu_stats = 3;
+}
+
+// Per-partition statistics and metadata resulting from DML statements.
+message DmlPartitionStatusPB {
+  // The id of the partition written to (may be -1 if the partition is created by this
+  // query). See THdfsTable.partitions.
+  optional int64 id = 1;
+
+  // The number of rows modified in this partition
+  optional int64 num_modified_rows = 2;
+
+  // Detailed statistics gathered by table writers for this partition
+  optional DmlStatsPB stats = 3;
+
+  // Fully qualified URI to the base directory for this partition.
+  optional string partition_base_dir = 4;
+
+  // The latest observed Kudu timestamp reported by the local KuduSession.
+  // This value is an unsigned int64.
+  optional int64 kudu_latest_observed_ts = 5;
+}
+
+// The results of a DML statement, sent to the coordinator as part of
+// ReportExecStatusRequestPB
+message DmlExecStatusPB {
+  // A map from temporary absolute file path to final absolute destination. The
+  // coordinator performs these updates after the query completes.
+  map<string, string> files_to_move = 1;
+
+  // Per-partition details, used in finalization and reporting.
+  // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with the
+  // root's key in an unpartitioned table being ROOT_PARTITION_KEY.
+  // The target table name is recorded in the corresponding TQueryExecRequest
+  map<string, DmlPartitionStatusPB> per_partition_status = 2;
+}
+
+// Error message exchange format
+message ErrorLogEntryPB {
+  // Number of error messages reported using the above identifier
+  optional int32 count = 1;
+
+  // Sample messages from the above error code
+  repeated string messages = 2;
+}
+
+// Represents the states that a fragment instance goes through during its execution. The
+// current state gets sent back to the coordinator and will be presented to users through
+// the debug webpages. The states are listed in the order to which they are transitioned.
+// Not all states are necessarily transitioned through when there are errors.
+enum FInstanceExecStatePB {
+  WAITING_FOR_EXEC = 0;
+  WAITING_FOR_PREPARE = 1;
+  WAITING_FOR_CODEGEN = 2;
+  WAITING_FOR_OPEN = 3;
+  WAITING_FOR_FIRST_BATCH = 4;
+  FIRST_BATCH_PRODUCED = 5;
+  PRODUCING_DATA = 6;
+  LAST_BATCH_SENT = 7;
+  FINISHED = 8;
+}
+
+message FragmentInstanceExecStatusPB {
+  // Sequence number prevents out-of-order or duplicated updates from being applied.
+  optional int64 report_seq_no = 1;
+
+  // The ID of the fragment instance which this report contains
+  optional UniqueIdPB fragment_instance_id = 2;
+
+  // Status of fragment execution; any error status means it's done.
+  optional StatusPB status = 3;
+
+  // If true, fragment finished executing.
+  optional bool done = 4;
+
+  // The current state of this fragment instance's execution.
+  optional FInstanceExecStatePB current_state = 5;
+
+  // Cumulative structural changes made by the table sink of this fragment
+  // instance. This is sent only when 'done' above is true. Not idempotent.
+  optional DmlExecStatusPB dml_exec_status = 6;
+
+  // Map of TErrorCode to ErrorLogEntryPB; New errors that have not been reported to
+  // the coordinator by this fragment instance. Not idempotent.
+  map<int32, ErrorLogEntryPB> error_log = 7;
+}
+
+message ReportExecStatusRequestPB {
+  // The query id which this report is for.
+  optional UniqueIdPB query_id = 1;
+
+  // same as TExecQueryFInstancesParams.coord_state_idx
+  optional int32 coord_state_idx = 2;
+
+  repeated FragmentInstanceExecStatusPB instance_exec_status = 3;
+
+  // Sidecar index of the cumulative profiles of all fragment instances
+  // in instance_exec_status.
+  optional int32 thrift_profiles_sidecar_idx = 4;
+
+  // Cumulative status for this backend. A backend can have an error from a specific
+  // fragment instance, or it can have a general error that is independent of any
+  // individual fragment. If reporting a single error, this status is always set to
+  // the error being reported. If reporting multiple errors, the status is set by the
+  // following rules:
+  // 1. A general error takes precedence over any fragment instance error.
+  // 2. Any fragment instance error takes precedence over any cancelled status.
+  // 3. If multiple fragments have errors, prefer the error that comes first in the
+  // 'instance_exec_status' list.
+  // This status is only OK if all fragment instances included are OK.
+  optional StatusPB status = 5;
+}
+
+message ReportExecStatusResponsePB {
+  optional StatusPB status = 1;
+}
+
+service ControlService {
+  // Override the default authorization method.
+  option (kudu.rpc.default_authz_method) = "Authorize";
+
+  rpc ReportExecStatus(ReportExecStatusRequestPB) returns (ReportExecStatusResponsePB);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/common/protobuf/data_stream_service.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index 68c2e90..b0e2b5d 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -16,6 +16,8 @@
 // under the License.
 //
 
+syntax="proto2";
+
 package impala;
 
 import "common.proto";

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/common/protobuf/row_batch.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/row_batch.proto b/common/protobuf/row_batch.proto
index 5aef55b..e35e5fa 100644
--- a/common/protobuf/row_batch.proto
+++ b/common/protobuf/row_batch.proto
@@ -16,6 +16,8 @@
 // under the License.
 //
 
+syntax="proto2";
+
 package impala;
 
 import "common.proto";

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/common/protobuf/rpc_test.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/rpc_test.proto b/common/protobuf/rpc_test.proto
index 70782e5..ead55f4 100644
--- a/common/protobuf/rpc_test.proto
+++ b/common/protobuf/rpc_test.proto
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 //
+
+syntax="proto2";
+
 package impala;
 
 import "kudu/rpc/rpc_header.proto";


[4/4] impala git commit: IMPALA-7586: fix predicate pushdown of escaped strings

Posted by ta...@apache.org.
IMPALA-7586: fix predicate pushdown of escaped strings

This fixes a class of bugs where the planner incorrectly uses the raw
string from the parser instead of the unescaped string. This occurs in
several places that push predicates down to the storage layer:
* Kudu scans
* HBase scans
* Data source scans

There are some more complex issues with escapes and the LIKE predicate
that are tracked separately by IMPALA-2422.

This also uncovered a different issue with RCFiles that is tracked by
IMPALA-7778 and is worked around by the tests added.

In order to make bugs like this more obvious in future, I renamed
getValue() to getValueWithOriginalEscapes().

Testing:
Added regression test that tests handling of backslash escapes on all
file formats. I did not add a regression test for the data source bug
since it seems to require some major modification of the data source
test infrastructure.

Change-Id: I53d6e20dd48ab6837ddd325db8a9d49ee04fed28
Reviewed-on: http://gerrit.cloudera.org:8080/11814
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/95b56d0e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/95b56d0e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/95b56d0e

Branch: refs/heads/master
Commit: 95b56d0e2d8232d8707603c360b98a35bb80ff3a
Parents: 5391100
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Oct 26 16:50:22 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Nov 1 21:27:13 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/analysis/AdminFnStmt.java |  2 +-
 .../java/org/apache/impala/analysis/Expr.java   |  2 +-
 .../apache/impala/analysis/ExtractFromExpr.java |  6 +-
 .../apache/impala/analysis/LikePredicate.java   |  5 +-
 .../apache/impala/analysis/StringLiteral.java   |  8 ++-
 .../impala/planner/DataSourceScanNode.java      |  3 +-
 .../apache/impala/planner/HBaseScanNode.java    |  4 +-
 .../org/apache/impala/planner/KuduScanNode.java |  4 +-
 testdata/data/README                            |  3 +
 testdata/data/strings_with_quotes.csv           | 11 ++++
 .../functional/functional_schema_template.sql   | 28 +++++++++
 .../datasets/functional/schema_constraints.csv  |  1 +
 .../QueryTest/string-escaping-rcfile-bug.test   | 66 ++++++++++++++++++++
 .../queries/QueryTest/string-escaping.test      | 62 ++++++++++++++++++
 tests/query_test/test_scanners.py               |  8 +++
 15 files changed, 199 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java b/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java
index 2f2eb2e..1e1f022 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AdminFnStmt.java
@@ -117,7 +117,7 @@ public class AdminFnStmt extends StatementBase {
         throw new AnalysisException(
             "Invalid backend, must be a string literal: " + backendExpr.toSql());
       }
-      backend_ = parseBackendAddress(((StringLiteral) backendExpr).getValue());
+      backend_ = parseBackendAddress(((StringLiteral) backendExpr).getUnescapedValue());
     }
     if (deadlineExpr != null) {
       deadlineSecs_ = deadlineExpr.evalToNonNegativeInteger(analyzer, "deadline");

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/fe/src/main/java/org/apache/impala/analysis/Expr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index 912ec8a..fdf639e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -1486,7 +1486,7 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
         return DEFAULT_AVG_STRING_LENGTH;
       }
     } else if (e instanceof StringLiteral) {
-      return ((StringLiteral) e).getValue().length();
+      return ((StringLiteral) e).getUnescapedValue().length();
     } else {
       // TODO(tmarshall): Extend this to support other string Exprs, such as
       // function calls that return string.

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/fe/src/main/java/org/apache/impala/analysis/ExtractFromExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ExtractFromExpr.java b/fe/src/main/java/org/apache/impala/analysis/ExtractFromExpr.java
index 40020e0..9f46187 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ExtractFromExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ExtractFromExpr.java
@@ -20,7 +20,6 @@ package org.apache.impala.analysis;
 import java.util.Set;
 
 import org.apache.impala.catalog.BuiltinsDb;
-import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TExtractField;
@@ -81,7 +80,8 @@ public class ExtractFromExpr extends FunctionCallExpr {
 
     super.analyzeImpl(analyzer);
 
-    String extractFieldIdent = ((StringLiteral)children_.get(1)).getValue();
+    String extractFieldIdent =
+        ((StringLiteral)children_.get(1)).getValueWithOriginalEscapes();
     Preconditions.checkNotNull(extractFieldIdent);
     if (!EXTRACT_FIELDS.contains(extractFieldIdent.toUpperCase())) {
       throw new AnalysisException("Time unit '" + extractFieldIdent + "' in expression '"
@@ -102,7 +102,7 @@ public class ExtractFromExpr extends FunctionCallExpr {
     StringBuilder strBuilder = new StringBuilder();
     strBuilder.append(getFnName().toString().toUpperCase());
     strBuilder.append("(");
-    strBuilder.append(((StringLiteral)getChild(1)).getValue());
+    strBuilder.append(((StringLiteral)getChild(1)).getValueWithOriginalEscapes());
     strBuilder.append(" FROM ");
     strBuilder.append(getChild(0).toSql());
     strBuilder.append(")");

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java b/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
index bc10b2a..3830c95 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
@@ -135,7 +135,7 @@ public class LikePredicate extends Predicate {
       // TODO: this checks that it's a Java-supported regex, but the syntax supported
       // by the backend is Posix; add a call to the backend to check the re syntax
       try {
-        Pattern.compile(((StringLiteral) getChild(1)).getValue());
+        Pattern.compile(((StringLiteral) getChild(1)).getValueWithOriginalEscapes());
       } catch (PatternSyntaxException e) {
         throw new AnalysisException(
             "invalid regular expression in '" + this.toSql() + "'");
@@ -148,7 +148,8 @@ public class LikePredicate extends Predicate {
   protected float computeEvalCost() {
     if (!hasChildCosts()) return UNKNOWN_COST;
     if (getChild(1).isLiteral() && !getChild(1).isNullLiteral() &&
-      Pattern.matches("[%_]*[^%_]*[%_]*", ((StringLiteral) getChild(1)).getValue())) {
+      Pattern.matches("[%_]*[^%_]*[%_]*",
+          ((StringLiteral) getChild(1)).getValueWithOriginalEscapes())) {
       // This pattern only has wildcards as leading or trailing character,
       // so it is linear.
       return getChildCosts() +

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java b/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java
index 5c51c45..12ad635 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java
@@ -79,7 +79,11 @@ public class StringLiteral extends LiteralExpr {
     msg.string_literal = new TStringLiteral(val);
   }
 
-  public String getValue() { return value_; }
+  /**
+   * Returns the original value that the string literal was constructed with,
+   * without escaping or unescaping it.
+   */
+  public String getValueWithOriginalEscapes() { return value_; }
 
   public String getUnescapedValue() {
     // Unescape string exactly like Hive does. Hive's method assumes
@@ -126,7 +130,7 @@ public class StringLiteral extends LiteralExpr {
 
   @Override
   public String getStringValue() {
-    return value_;
+    return getValueWithOriginalEscapes();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index a41630b..1ddb394 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -129,7 +129,8 @@ public class DataSourceScanNode extends ScanNode {
         return new TColumnValue().setDouble_val(
             ((NumericLiteral) expr).getDoubleValue());
       case STRING:
-        return new TColumnValue().setString_val(((StringLiteral) expr).getValue());
+        return new TColumnValue().setString_val(
+            ((StringLiteral) expr).getUnescapedValue());
       case DECIMAL:
       case DATE:
       case DATETIME:

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index 06aca1f..13ecb6a 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -166,7 +166,7 @@ public class HBaseScanNode extends ScanNode {
             analyzer.getQueryCtx());
         if (val instanceof StringLiteral) {
           StringLiteral litVal = (StringLiteral) val;
-          startKey_ = convertToBytes(litVal.getStringValue(),
+          startKey_ = convertToBytes(litVal.getUnescapedValue(),
               !rowRange.getLowerBoundInclusive());
         } else {
           // lower bound is null.
@@ -182,7 +182,7 @@ public class HBaseScanNode extends ScanNode {
             analyzer.getQueryCtx());
         if (val instanceof StringLiteral) {
           StringLiteral litVal = (StringLiteral) val;
-          stopKey_ = convertToBytes(litVal.getStringValue(),
+          stopKey_ = convertToBytes(litVal.getUnescapedValue(),
               rowRange.getUpperBoundInclusive());
         } else {
           // lower bound is null.

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 56c7602..adeaa72 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -415,7 +415,7 @@ public class KuduScanNode extends ScanNode {
       case VARCHAR:
       case CHAR: {
         kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
-            ((StringLiteral)literal).getStringValue());
+            ((StringLiteral)literal).getUnescapedValue());
         break;
       }
       case TIMESTAMP: {
@@ -523,7 +523,7 @@ public class KuduScanNode extends ScanNode {
       case BIGINT: return ((NumericLiteral) e).getLongValue();
       case FLOAT: return (float) ((NumericLiteral) e).getDoubleValue();
       case DOUBLE: return ((NumericLiteral) e).getDoubleValue();
-      case STRING: return ((StringLiteral) e).getValue();
+      case STRING: return ((StringLiteral) e).getUnescapedValue();
       case TIMESTAMP: {
         try {
           // TODO: Simplify when Impala supports a 64-bit TIMESTAMP type.

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index 599b009..db055c7 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -230,3 +230,6 @@ valid range [0..24H). Before the fix, select * returned these values:
 1970-01-01 00:00:00
 1970-01-01 23:59:59.999999999
 1970-01-01 24:00:00 (invalid - time of day should be less than a whole day)
+
+strings_with_quotes.csv:
+Various strings with quotes in them to reproduce bugs like IMPALA-7586.

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/testdata/data/strings_with_quotes.csv
----------------------------------------------------------------------
diff --git a/testdata/data/strings_with_quotes.csv b/testdata/data/strings_with_quotes.csv
new file mode 100644
index 0000000..0d82f03
--- /dev/null
+++ b/testdata/data/strings_with_quotes.csv
@@ -0,0 +1,11 @@
+",1
+"",2
+\\",3
+'',4
+',5
+foo',6
+'foo,7
+"foo",8
+"foo,9
+foo"bar,10
+foo\\"bar,11

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/testdata/datasets/functional/functional_schema_template.sql
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index cfd9030..0712b7c 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -2194,3 +2194,31 @@ CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (i1 integer)
 STORED AS {file_format}
 TBLPROPERTIES('skip.header.line.count'='2');
 ====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+strings_with_quotes
+---- COLUMNS
+s string
+i int
+---- ROW_FORMAT
+delimited fields terminated by ','  escaped by '\\'
+---- LOAD
+LOAD DATA LOCAL INPATH '{impala_home}/testdata/data/strings_with_quotes.csv'
+OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
+---- DEPENDENT_LOAD
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name}
+SELECT s, i
+FROM {db_name}.{table_name};
+---- CREATE_KUDU
+DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name};
+CREATE TABLE {db_name}{db_suffix}.{table_name} (
+  s string PRIMARY KEY,
+  i int
+)
+PARTITION BY HASH (s) PARTITIONS 3 STORED AS KUDU;
+---- DEPENDENT_LOAD_KUDU
+INSERT into TABLE {db_name}{db_suffix}.{table_name}
+SELECT s, i
+FROM {db_name}.{table_name};
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/testdata/datasets/functional/schema_constraints.csv
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index baf0306..e09ca36 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -193,6 +193,7 @@ table_name:nulltable, constraint:only, table_format:kudu/none/none
 table_name:nullescapedtable, constraint:only, table_format:kudu/none/none
 table_name:decimal_tbl, constraint:only, table_format:kudu/none/none
 table_name:decimal_tiny, constraint:only, table_format:kudu/none/none
+table_name:strings_with_quotes, constraint:only, table_format:kudu/none/none
 
 # Skipping header lines is only effective with text tables
 table_name:table_with_header, constraint:restrict_to, table_format:text/none/none

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/testdata/workloads/functional-query/queries/QueryTest/string-escaping-rcfile-bug.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/string-escaping-rcfile-bug.test b/testdata/workloads/functional-query/queries/QueryTest/string-escaping-rcfile-bug.test
new file mode 100644
index 0000000..d757480
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/string-escaping-rcfile-bug.test
@@ -0,0 +1,66 @@
+# This file contains the current output for queries against strings_with_quotes
+# for RCFile. The output is different because of a bug in the RCFile scanner.
+# This file can be removed once IMPALA-7778 is fixed.
+====
+---- QUERY
+# Check that all strings in the table are returned correctly.
+# IMPALA-7778: escapes are ignored so output is incorrect
+select s
+from strings_with_quotes
+---- RESULTS
+'"'
+'""'
+'\\\\"'
+''''''
+''''
+'foo'''
+'''foo'
+'"foo"'
+'"foo'
+'foo"bar'
+'foo\\\\"bar'
+---- TYPES
+STRING
+====
+---- QUERY
+# Regression test for IMPALA-7586: predicate pushed down with incorrect string escaping.
+select s
+from strings_with_quotes
+where s = '"'
+---- RESULTS
+'"'
+---- TYPES
+STRING
+====
+---- QUERY
+# Regression test for IMPALA-7586: predicate pushed down with incorrect string escaping.
+# IMPALA-7778: escapes are ignored so output is incorrect
+select s
+from strings_with_quotes
+where s = '\\"'
+---- RESULTS
+---- TYPES
+STRING
+====
+---- QUERY
+# Regression test for IMPALA-7586: predicate pushed down with incorrect string escaping.
+select s
+from strings_with_quotes
+where s in ('"', 'foo"bar')
+---- RESULTS
+'"'
+'foo"bar'
+---- TYPES
+STRING
+====
+---- QUERY
+# Regression test for IMPALA-7586: predicate pushed down with incorrect string escaping.
+# IMPALA-7778: escapes are ignored so output is incorrect
+select s
+from strings_with_quotes
+where s in ('\\"', 'foo"bar')
+---- RESULTS
+'foo"bar'
+---- TYPES
+STRING
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/testdata/workloads/functional-query/queries/QueryTest/string-escaping.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/string-escaping.test b/testdata/workloads/functional-query/queries/QueryTest/string-escaping.test
new file mode 100644
index 0000000..a2f2c5d
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/string-escaping.test
@@ -0,0 +1,62 @@
+====
+---- QUERY
+# Check that all strings in the table are returned correctly.
+select s
+from strings_with_quotes
+---- RESULTS
+'"'
+'""'
+'\\"'
+''''''
+''''
+'foo'''
+'''foo'
+'"foo"'
+'"foo'
+'foo"bar'
+'foo\\"bar'
+---- TYPES
+STRING
+====
+---- QUERY
+# Regression test for IMPALA-7586: predicate pushed down with incorrect string escaping.
+select s
+from strings_with_quotes
+where s = '"'
+---- RESULTS
+'"'
+---- TYPES
+STRING
+====
+---- QUERY
+# Regression test for IMPALA-7586: predicate pushed down with incorrect string escaping.
+select s
+from strings_with_quotes
+where s = '\\"'
+---- RESULTS
+'\\"'
+---- TYPES
+STRING
+====
+---- QUERY
+# Regression test for IMPALA-7586: predicate pushed down with incorrect string escaping.
+select s
+from strings_with_quotes
+where s in ('"', 'foo"bar')
+---- RESULTS
+'"'
+'foo"bar'
+---- TYPES
+STRING
+====
+---- QUERY
+# Regression test for IMPALA-7586: predicate pushed down with incorrect string escaping.
+select s
+from strings_with_quotes
+where s in ('\\"', 'foo"bar')
+---- RESULTS
+'\\"'
+'foo"bar'
+---- TYPES
+STRING
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/95b56d0e/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 91f0449..f4ddad4 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -97,6 +97,14 @@ class TestScannersAllTableFormats(ImpalaTestSuite):
       pytest.skip()
     self.run_test_case('QueryTest/hdfs_scanner_profile', vector)
 
+  def test_string_escaping(self, vector):
+    """Test handling of string escape sequences."""
+    if vector.get_value('table_format').file_format == 'rc':
+      # IMPALA-7778: RCFile scanner incorrectly ignores escapes for now.
+      self.run_test_case('QueryTest/string-escaping-rcfile-bug', vector)
+    else:
+      self.run_test_case('QueryTest/string-escaping', vector)
+
 # Test all the scanners with a simple limit clause. The limit clause triggers
 # cancellation in the scanner code paths.
 class TestScannersAllTableFormatsWithLimit(ImpalaTestSuite):


[3/4] impala git commit: IMPALA-7213, IMPALA-7241: Port ReportExecStatus() RPC to use KRPC

Posted by ta...@apache.org.
IMPALA-7213, IMPALA-7241: Port ReportExecStatus() RPC to use KRPC

This change converts ReportExecStatus() RPC from thrift
based RPC to KRPC. This is done in part of the preparation
for fixing IMPALA-2990 as we can take advantage of TCP connection
multiplexing in KRPC to avoid overwhelming the coordinator
with too many connections by reducing the number of TCP connection
to one for each executor.

This patch also introduces a new service pool for all query execution
control related RPCs in the future so that control commands from
coordinators aren't blocked by long-running DataStream services' RPCs.
To avoid unnecessary delays due to sharing the network connections
between DataStream service and Control service, this change added the
service name as part of the user credentials for the ConnectionId
so each service will use a separate connection.

The majority of this patch is mechanical conversion of some Thrift
structures used in ReportExecStatus() RPC to Protobuf. Note that the
runtime profile is still retained as a Thrift structure as Impala
clients will still fetch query profiles using Thrift RPCs. This also
avoids duplicating the serialization implementation in both Thrift
and Protobuf for the runtime profile. The Thrift runtime profiles
are serialized and sent as a sidecar in ReportExecStatus() RPC.

This patch also fixes IMPALA-7241 which may lead to duplicated
dml stats being applied. The fix is by adding a monotonically
increasing version number for fragment instances' reports. The
coordinator will ignore any report smaller than or equal to the
version in the last report.

Testing done:
1. Exhaustive build.
2. Added some targeted test cases for profile serialization failure
   and RPC retries/timeout.

Change-Id: I7638583b433dcac066b87198e448743d90415ebe
Reviewed-on: http://gerrit.cloudera.org:8080/10855
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/5391100c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/5391100c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/5391100c

Branch: refs/heads/master
Commit: 5391100c7eeb33193de7861e761c3920f1d1eecc
Parents: e3a7027
Author: Michael Ho <kw...@cloudera.com>
Authored: Tue May 22 16:38:03 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Nov 1 21:12:12 2018 +0000

----------------------------------------------------------------------
 be/src/benchmarks/expr-benchmark.cc         |   2 +-
 be/src/catalog/catalog-util.cc              |   2 +-
 be/src/common/global-flags.cc               |   1 +
 be/src/exec/data-sink.cc                    |   3 +
 be/src/exec/data-sink.h                     |   3 +
 be/src/exec/hbase-table-sink.cc             |   3 -
 be/src/exec/hdfs-parquet-table-writer.cc    |  27 +--
 be/src/exec/hdfs-parquet-table-writer.h     |   8 +-
 be/src/exec/hdfs-table-sink.cc              |   7 +-
 be/src/exec/hdfs-table-writer.cc            |   2 +-
 be/src/exec/hdfs-table-writer.h             |   7 +-
 be/src/rpc/CMakeLists.txt                   |  14 +-
 be/src/rpc/jni-thrift-util.h                |   2 +-
 be/src/rpc/rpc-mgr-kerberized-test.cc       |   8 +-
 be/src/rpc/rpc-mgr-test.cc                  |   7 +-
 be/src/rpc/rpc-mgr-test.h                   |  33 +++-
 be/src/rpc/rpc-mgr.h                        |   6 +-
 be/src/rpc/thrift-util-test.cc              |  17 +-
 be/src/rpc/thrift-util.h                    |   8 +-
 be/src/runtime/backend-client.h             |  10 --
 be/src/runtime/coordinator-backend-state.cc |  83 +++++----
 be/src/runtime/coordinator-backend-state.h  |  26 ++-
 be/src/runtime/coordinator.cc               |  22 ++-
 be/src/runtime/coordinator.h                |  36 ++--
 be/src/runtime/dml-exec-state.cc            | 177 ++++++++++---------
 be/src/runtime/dml-exec-state.h             |  24 +--
 be/src/runtime/exec-env.cc                  |   5 +-
 be/src/runtime/exec-env.h                   |   4 +-
 be/src/runtime/fragment-instance-state.cc   |  70 ++++----
 be/src/runtime/fragment-instance-state.h    |  38 +++--
 be/src/runtime/krpc-data-stream-sender.cc   |   6 +-
 be/src/runtime/query-state.cc               | 207 +++++++++++++++--------
 be/src/runtime/query-state.h                |  24 ++-
 be/src/runtime/runtime-state.cc             |  12 +-
 be/src/runtime/runtime-state.h              |  11 +-
 be/src/runtime/test-env.cc                  |  22 ++-
 be/src/scheduling/admission-controller.cc   |   2 +-
 be/src/scheduling/scheduler-test-util.cc    |   2 +-
 be/src/service/CMakeLists.txt               |   7 +-
 be/src/service/client-request-state.cc      |   4 +-
 be/src/service/client-request-state.h       |  15 +-
 be/src/service/control-service.cc           | 157 +++++++++++++++++
 be/src/service/control-service.h            |  83 +++++++++
 be/src/service/data-stream-service.cc       |  21 ++-
 be/src/service/data-stream-service.h        |  10 +-
 be/src/service/impala-internal-service.cc   |   8 -
 be/src/service/impala-internal-service.h    |   2 -
 be/src/service/impala-server.cc             |  32 +---
 be/src/service/impala-server.h              |   6 +-
 be/src/testutil/in-process-servers.cc       |  12 +-
 be/src/util/container-util.h                |   8 +-
 be/src/util/error-util-internal.h           |  70 ++++++++
 be/src/util/error-util-test.cc              |  82 ++++-----
 be/src/util/error-util.cc                   |  78 +++++----
 be/src/util/error-util.h                    |  30 ----
 be/src/util/runtime-profile.cc              |   3 +-
 be/src/util/uid-util.h                      |  19 +++
 common/protobuf/CMakeLists.txt              |  66 ++++----
 common/protobuf/common.proto                |   8 +-
 common/protobuf/control_service.proto       | 172 +++++++++++++++++++
 common/protobuf/data_stream_service.proto   |   2 +
 common/protobuf/row_batch.proto             |   2 +
 common/protobuf/rpc_test.proto              |   3 +
 common/thrift/ImpalaInternalService.thrift  | 183 ++------------------
 tests/custom_cluster/test_rpc_timeout.py    |  38 +++--
 65 files changed, 1296 insertions(+), 766 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index 26cf252..5715cc1 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -76,7 +76,7 @@ class Planner {
     query_ctx.client_request.query_options = query_options_;
     query_ctx.__set_session(session_state_);
     TNetworkAddress dummy;
-    ImpalaServer::PrepareQueryContext(dummy, &query_ctx);
+    ImpalaServer::PrepareQueryContext(dummy, dummy, &query_ctx);
     runtime_state_.reset(new RuntimeState(query_ctx, &exec_env_));
 
     return frontend_.GetExecRequest(query_ctx, result);

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/catalog/catalog-util.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.cc b/be/src/catalog/catalog-util.cc
index 5084828..6344d4d 100644
--- a/be/src/catalog/catalog-util.cc
+++ b/be/src/catalog/catalog-util.cc
@@ -112,7 +112,7 @@ jobject CatalogUpdateResultIterator::next(JNIEnv* env) {
     ++pos_;
     uint8_t* buf;
     uint32_t buf_size;
-    Status s = serializer_.Serialize(current_obj, &buf_size, &buf);
+    Status s = serializer_.SerializeToBuffer(current_obj, &buf_size, &buf);
     if (!s.ok()) {
       LOG(ERROR) << "Error serializing catalog object: " << s.GetDetail();
       continue;

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 0473352..4c1941c 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -299,6 +299,7 @@ REMOVED_FLAG(rpc_cnxn_attempts);
 REMOVED_FLAG(rpc_cnxn_retry_interval_ms);
 REMOVED_FLAG(skip_lzo_version_check);
 REMOVED_FLAG(staging_cgroup);
+REMOVED_FLAG(status_report_interval);
 REMOVED_FLAG(suppress_unknown_disk_id_warnings);
 REMOVED_FLAG(use_statestore);
 REMOVED_FLAG(use_kudu_kinit);

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 5e0fc3c..4071f43 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -45,6 +45,9 @@ using strings::Substitute;
 
 namespace impala {
 
+// Empty string
+const char* const DataSink::ROOT_PARTITION_KEY = "";
+
 DataSink::DataSink(const RowDescriptor* row_desc, const string& name, RuntimeState* state)
   : closed_(false), row_desc_(row_desc), name_(name) {
   profile_ = RuntimeProfile::Create(state->obj_pool(), name);

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/exec/data-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index d4f2040..cc5e6ba 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -96,6 +96,9 @@ class DataSink {
   }
   bool is_closed() const { return closed_; }
 
+  /// Default partition key when none is specified.
+  static const char* const ROOT_PARTITION_KEY;
+
  protected:
   /// Set to true after Close() has been called. Subclasses should check and set this in
   /// Close().

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/exec/hbase-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index 567e8bd..4f54d6e 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -31,9 +31,6 @@
 
 namespace impala {
 
-const static string& ROOT_PARTITION_KEY =
-    g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
-
 HBaseTableSink::HBaseTableSink(const RowDescriptor* row_desc, const TDataSink& tsink,
     RuntimeState* state)
   : DataSink(row_desc, "HBaseTableSink", state),

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 8a8add6..dd60efd 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -682,7 +682,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
 
     uint8_t* header_buffer;
     uint32_t header_len;
-    RETURN_IF_ERROR(parent_->thrift_serializer_->Serialize(
+    RETURN_IF_ERROR(parent_->thrift_serializer_->SerializeToBuffer(
         &header, &header_len, &header_buffer));
     RETURN_IF_ERROR(parent_->Write(header_buffer, header_len));
     *file_pos += header_len;
@@ -719,7 +719,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
     uint8_t* buffer = nullptr;
     uint32_t len = 0;
     RETURN_IF_ERROR(
-        parent_->thrift_serializer_->Serialize(&page.header, &len, &buffer));
+        parent_->thrift_serializer_->SerializeToBuffer(&page.header, &len, &buffer));
     RETURN_IF_ERROR(parent_->Write(buffer, len));
     *file_pos += len;
 
@@ -814,7 +814,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   // Add the size of the data page header
   uint8_t* header_buffer;
   uint32_t header_len = 0;
-  RETURN_IF_ERROR(parent_->thrift_serializer_->Serialize(
+  RETURN_IF_ERROR(parent_->thrift_serializer_->SerializeToBuffer(
       &current_page_->header, &header_len, &header_buffer));
 
   current_page_->finalized = true;
@@ -1152,7 +1152,7 @@ Status HdfsParquetTableWriter::Finalize() {
   RETURN_IF_ERROR(WritePageIndex());
   for (auto& column : columns_) column->Reset();
   RETURN_IF_ERROR(WriteFileFooter());
-  stats_.__set_parquet_stats(parquet_insert_stats_);
+  *stats_.mutable_parquet_stats() = parquet_dml_stats_;
   COUNTER_ADD(parent_->rows_inserted_counter(), row_count_);
   return Status::OK();
 }
@@ -1200,8 +1200,9 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     current_row_group_->num_rows = col_writer->num_values();
     current_row_group_->columns[i].file_offset = file_pos_;
     const string& col_name = table_desc_->col_descs()[i + num_clustering_cols].name();
-    parquet_insert_stats_.per_column_size[col_name] +=
-        col_writer->total_compressed_size();
+    google::protobuf::Map<string,int64>* column_size_map =
+        parquet_dml_stats_.mutable_per_column_size();
+    (*column_size_map)[col_name] += col_writer->total_compressed_size();
 
     // Write encodings and encoding stats for this column
     col_metadata.encodings.clear();
@@ -1241,8 +1242,8 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     // written without buffering.
     uint8_t* buffer = nullptr;
     uint32_t len = 0;
-    RETURN_IF_ERROR(
-        thrift_serializer_->Serialize(&current_row_group_->columns[i], &len, &buffer));
+    RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer(
+        &current_row_group_->columns[i], &len, &buffer));
     RETURN_IF_ERROR(Write(buffer, len));
     file_pos_ += len;
   }
@@ -1281,7 +1282,8 @@ Status HdfsParquetTableWriter::WritePageIndex() {
     column.column_index_.__isset.null_counts = true;
     uint8_t* buffer = nullptr;
     uint32_t len = 0;
-    RETURN_IF_ERROR(thrift_serializer_->Serialize(&column.column_index_, &len, &buffer));
+    RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer(
+        &column.column_index_, &len, &buffer));
     RETURN_IF_ERROR(Write(buffer, len));
     // Update the column_index_offset and column_index_length of the ColumnChunk
     row_group->columns[i].__set_column_index_offset(file_pos_);
@@ -1293,7 +1295,8 @@ Status HdfsParquetTableWriter::WritePageIndex() {
     auto& column = *columns_[i];
     uint8_t* buffer = nullptr;
     uint32_t len = 0;
-    RETURN_IF_ERROR(thrift_serializer_->Serialize(&column.offset_index_, &len, &buffer));
+    RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer(
+        &column.offset_index_, &len, &buffer));
     RETURN_IF_ERROR(Write(buffer, len));
     // Update the offset_index_offset and offset_index_length of the ColumnChunk
     row_group->columns[i].__set_offset_index_offset(file_pos_);
@@ -1307,8 +1310,8 @@ Status HdfsParquetTableWriter::WriteFileFooter() {
   // Write file_meta_data
   uint32_t file_metadata_len = 0;
   uint8_t* buffer = nullptr;
-  RETURN_IF_ERROR(
-      thrift_serializer_->Serialize(&file_metadata_, &file_metadata_len, &buffer));
+  RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer(
+      &file_metadata_, &file_metadata_len, &buffer));
   RETURN_IF_ERROR(Write(buffer, file_metadata_len));
 
   // Write footer

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/exec/hdfs-parquet-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h
index 33297c2..0aa7ac0 100644
--- a/be/src/exec/hdfs-parquet-table-writer.h
+++ b/be/src/exec/hdfs-parquet-table-writer.h
@@ -25,10 +25,12 @@
 #include <map>
 #include <boost/scoped_ptr.hpp>
 
-#include "util/compress.h"
-#include "runtime/descriptors.h"
 #include "exec/hdfs-table-writer.h"
 #include "exec/parquet-common.h"
+#include "runtime/descriptors.h"
+#include "util/compress.h"
+
+#include "gen-cpp/control_service.pb.h"
 
 namespace impala {
 
@@ -196,7 +198,7 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   std::vector<uint8_t> compression_staging_buffer_;
 
   /// For each column, the on disk size written.
-  TParquetInsertStats parquet_insert_stats_;
+  ParquetDmlStatsPB parquet_dml_stats_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 24df2ff..634dda4 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -52,9 +52,6 @@ using namespace strings;
 
 namespace impala {
 
-const static string& ROOT_PARTITION_KEY =
-    g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
-
 HdfsTableSink::HdfsTableSink(const RowDescriptor* row_desc, const TDataSink& tsink,
     RuntimeState* state)
   : DataSink(row_desc, "HdfsTableSink", state),
@@ -611,8 +608,8 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) {
   return Status::OK();
 }
 
-Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state,
-                                            OutputPartition* partition) {
+Status HdfsTableSink::FinalizePartitionFile(
+    RuntimeState* state, OutputPartition* partition) {
   if (partition->tmp_hdfs_file == nullptr && !overwrite_) return Status::OK();
   SCOPED_TIMER(ADD_TIMER(profile(), "FinalizePartitionFileTimer"));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/exec/hdfs-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.cc b/be/src/exec/hdfs-table-writer.cc
index edc4be8..af49684 100644
--- a/be/src/exec/hdfs-table-writer.cc
+++ b/be/src/exec/hdfs-table-writer.cc
@@ -53,7 +53,7 @@ Status HdfsTableWriter::Write(const uint8_t* data, int32_t len) {
     return Status(msg.str());
   }
   COUNTER_ADD(parent_->bytes_written_counter(), len);
-  stats_.bytes_written += len;
+  stats_.set_bytes_written(stats_.bytes_written() + len);
   return Status::OK();
 }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/exec/hdfs-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h
index a6f06c0..2d92829 100644
--- a/be/src/exec/hdfs-table-writer.h
+++ b/be/src/exec/hdfs-table-writer.h
@@ -23,7 +23,7 @@
 #include <hdfs.h>
 
 #include "common/status.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/control_service.pb.h"
 
 namespace impala {
 
@@ -88,7 +88,7 @@ class HdfsTableWriter {
   virtual void Close() = 0;
 
   /// Returns the stats for this writer.
-  TInsertStats& stats() { return stats_; }
+  const DmlStatsPB& stats() const { return stats_; }
 
   /// Default block size to use for this file format.  If the file format doesn't
   /// care, it should return 0 and the hdfs config default will be used.
@@ -133,8 +133,7 @@ class HdfsTableWriter {
   const std::vector<ScalarExprEvaluator*>& output_expr_evals_;
 
   /// Subclass should populate any file format specific stats.
-  TInsertStats stats_;
-
+  DmlStatsPB stats_;
 };
 }
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index 56a4190..be50575 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -47,17 +47,17 @@ target_link_libraries(thrift-server-test security-test-for-impala)
 ADD_BE_LSAN_TEST(authentication-test)
 
 ADD_BE_TEST(rpc-mgr-test) # TODO: this test leaks various KRPC things
-add_dependencies(rpc-mgr-test rpc_test_proto)
-target_link_libraries(rpc-mgr-test rpc_test_proto)
+add_dependencies(rpc-mgr-test rpc_test_proto_tgt)
+target_link_libraries(rpc-mgr-test rpc_test_proto_tgt)
 target_link_libraries(rpc-mgr-test security-test-for-impala)
 target_link_libraries(rpc-mgr-test ${KRB5_REALM_OVERRIDE})
 
 ADD_BE_TEST(rpc-mgr-kerberized-test) # TODO: this test leaks various KRPC things
-add_dependencies(rpc-mgr-kerberized-test rpc_test_proto)
-target_link_libraries(rpc-mgr-kerberized-test rpc_test_proto)
+add_dependencies(rpc-mgr-kerberized-test rpc_test_proto_tgt)
+target_link_libraries(rpc-mgr-kerberized-test rpc_test_proto_tgt)
 target_link_libraries(rpc-mgr-kerberized-test security-test-for-impala)
 target_link_libraries(rpc-mgr-kerberized-test ${KRB5_REALM_OVERRIDE})
 
-add_library(rpc_test_proto ${RPC_TEST_PROTO_SRCS})
-add_dependencies(rpc_test_proto rpc_test_proto_tgt krpc)
-target_link_libraries(rpc_test_proto krpc)
+add_library(rpc_test_proto_tgt ${RPC_TEST_PROTO_SRCS})
+add_dependencies(rpc_test_proto_tgt rpc_test_proto krpc)
+target_link_libraries(rpc_test_proto_tgt krpc)

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/rpc/jni-thrift-util.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/jni-thrift-util.h b/be/src/rpc/jni-thrift-util.h
index c55424d..a3674ef 100644
--- a/be/src/rpc/jni-thrift-util.h
+++ b/be/src/rpc/jni-thrift-util.h
@@ -33,7 +33,7 @@ Status SerializeThriftMsg(JNIEnv* env, T* msg, jbyteArray* serialized_msg) {
 
   uint8_t* buffer = NULL;
   uint32_t size = 0;
-  RETURN_IF_ERROR(serializer.Serialize<T>(msg, &size, &buffer));
+  RETURN_IF_ERROR(serializer.SerializeToBuffer(msg, &size, &buffer));
 
   /// create jbyteArray given buffer
   *serialized_msg = env->NewByteArray(size);

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/rpc/rpc-mgr-kerberized-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc b/be/src/rpc/rpc-mgr-kerberized-test.cc
index 0121787..86c9eaa 100644
--- a/be/src/rpc/rpc-mgr-kerberized-test.cc
+++ b/be/src/rpc/rpc-mgr-kerberized-test.cc
@@ -108,8 +108,8 @@ TEST_F(RpcMgrKerberizedTest, AuthorizationFail) {
   // ScanMemService's authorization function always returns true so we should be able
   // to access with dummy credentials.
   unique_ptr<ScanMemServiceProxy> scan_proxy;
-  ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(
-      krpc_address_, FLAGS_hostname, &scan_proxy));
+  ASSERT_OK(static_cast<ScanMemServiceImpl*>(scan_mem_impl)->GetProxy(krpc_address_,
+      FLAGS_hostname, &scan_proxy));
   ScanMemRequestPB scan_request;
   ScanMemResponsePB scan_response;
   SetupScanMemRequest(&scan_request, &controller);
@@ -120,8 +120,8 @@ TEST_F(RpcMgrKerberizedTest, AuthorizationFail) {
 
   // Fail to access PingService as it's expecting FLAGS_be_principal as principal name.
   unique_ptr<PingServiceProxy> ping_proxy;
-  ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(
-      krpc_address_, FLAGS_hostname, &ping_proxy));
+  ASSERT_OK(static_cast<PingServiceImpl*>(ping_impl)->GetProxy(krpc_address_,
+      FLAGS_hostname, &ping_proxy));
   PingRequestPB ping_request;
   PingResponsePB ping_response;
   controller.Reset();

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 0d4ad47..07dbfe7 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -185,7 +185,8 @@ TEST_F(RpcMgrTest, SlowCallback) {
   ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
 
   unique_ptr<PingServiceProxy> proxy;
-  ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(krpc_address_, FLAGS_hostname, &proxy));
+  ASSERT_OK(static_cast<PingServiceImpl*>(ping_impl)->GetProxy(krpc_address_,
+      FLAGS_hostname, &proxy));
 
   PingRequestPB request;
   PingResponsePB response;
@@ -205,8 +206,8 @@ TEST_F(RpcMgrTest, AsyncCall) {
       static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
 
   unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
-  ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, FLAGS_hostname,
-      &scan_mem_proxy));
+  ASSERT_OK(static_cast<ScanMemServiceImpl*>(scan_mem_impl)->GetProxy(krpc_address_,
+      FLAGS_hostname, &scan_mem_proxy));
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/rpc/rpc-mgr-test.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.h b/be/src/rpc/rpc-mgr-test.h
index d95df36..42c47fe 100644
--- a/be/src/rpc/rpc-mgr-test.h
+++ b/be/src/rpc/rpc-mgr-test.h
@@ -22,11 +22,13 @@
 
 #include "common/init.h"
 #include "exec/kudu-util.h"
+#include "kudu/rpc/remote_user.h"
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/util/status.h"
+#include "rpc/rpc-mgr.inline.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "testutil/gtest-util.h"
@@ -43,6 +45,7 @@
 #include "common/names.h"
 
 using kudu::rpc::GeneratedServiceIf;
+using kudu::rpc::RemoteUser;
 using kudu::rpc::RpcController;
 using kudu::rpc::RpcContext;
 using kudu::rpc::RpcSidecar;
@@ -174,10 +177,23 @@ class PingServiceImpl : public PingServiceIf {
       mem_tracker_(-1, "Ping Service"),
       cb_(cb) {}
 
+  Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
+      std::unique_ptr<PingServiceProxy>* proxy) {
+    return rpc_mgr_->GetProxy(address, hostname, proxy);
+  }
+
   virtual bool Authorize(const google::protobuf::Message* req,
       google::protobuf::Message* resp, RpcContext* context) override {
     if (!IsKerberosEnabled()) {
-      return context->remote_user().username() == "impala";
+      const RemoteUser& remote_user = context->remote_user();
+      if (remote_user.username() != "impala") {
+        mem_tracker_.Release(context->GetTransferSize());
+        context->RespondFailure(kudu::Status::NotAuthorized(
+            Substitute("$0 is not allowed to access PingService",
+                remote_user.ToString())));
+        return false;
+      }
+      return true;
     } else {
       return rpc_mgr_->Authorize("PingService", context, mem_tracker());
     }
@@ -203,9 +219,15 @@ class ScanMemServiceImpl : public ScanMemServiceIf {
  public:
   ScanMemServiceImpl(RpcMgr* rpc_mgr)
     : ScanMemServiceIf(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()),
+      rpc_mgr_(rpc_mgr),
       mem_tracker_(-1, "ScanMem Service") {
   }
 
+  Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
+      std::unique_ptr<ScanMemServiceProxy>* proxy) {
+    return rpc_mgr_->GetProxy(address, hostname, proxy);
+  }
+
   // A no-op authorization function.
   virtual bool Authorize(const google::protobuf::Message* req,
       google::protobuf::Message* resp, RpcContext* context) override {
@@ -241,6 +263,7 @@ class ScanMemServiceImpl : public ScanMemServiceIf {
   MemTracker* mem_tracker() { return &mem_tracker_; }
 
  private:
+  RpcMgr* rpc_mgr_;
   MemTracker mem_tracker_;
 
 };
@@ -266,12 +289,12 @@ Status RpcMgrTest::RunMultipleServicesTest(
   RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
 
   unique_ptr<PingServiceProxy> ping_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, FLAGS_hostname,
-      &ping_proxy));
+  RETURN_IF_ERROR(static_cast<PingServiceImpl*>(ping_impl)->GetProxy(krpc_address,
+      FLAGS_hostname, &ping_proxy));
 
   unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, FLAGS_hostname,
-      &scan_mem_proxy));
+  RETURN_IF_ERROR(static_cast<ScanMemServiceImpl*>(scan_mem_impl)->GetProxy(krpc_address,
+      FLAGS_hostname, &scan_mem_proxy));
 
   RpcController controller;
   srand(0);

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index 6435a74..7829897 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -143,9 +143,9 @@ class RpcMgr {
   bool Authorize(const string& service_name, kudu::rpc::RpcContext* context,
       MemTracker* mem_tracker) const;
 
-  /// Creates a new proxy for a remote service of type P at location 'address' with
-  /// hostname 'hostname' and places it in 'proxy'. 'P' must descend from
-  /// kudu::rpc::ServiceIf. Note that 'address' must be a resolved IP address.
+  /// Creates a new proxy of type P at location 'address' with hostname 'hostname' and
+  /// places it in 'proxy'. 'P' must descend from kudu::rpc::Proxy. Note that 'address'
+  /// must be a resolved IP address.
   template <typename P>
   Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
       std::unique_ptr<P>* proxy) WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/rpc/thrift-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-util-test.cc b/be/src/rpc/thrift-util-test.cc
index 7193e49..a546f2b 100644
--- a/be/src/rpc/thrift-util-test.cc
+++ b/be/src/rpc/thrift-util-test.cc
@@ -41,20 +41,20 @@ TEST(ThriftUtil, SimpleSerializeDeserialize) {
     counter.__set_value(123);
 
     vector<uint8_t> msg;
-    EXPECT_OK(serializer.Serialize(&counter, &msg));
+    EXPECT_OK(serializer.SerializeToVector(&counter, &msg));
 
     uint8_t* buffer1 = NULL;
     uint8_t* buffer2 = NULL;
     uint32_t len1 = 0;
     uint32_t len2 = 0;
 
-    EXPECT_OK(serializer.Serialize(&counter, &len1, &buffer1));
+    EXPECT_OK(serializer.SerializeToBuffer(&counter, &len1, &buffer1));
 
     EXPECT_EQ(len1, msg.size());
     EXPECT_TRUE(memcmp(buffer1, msg.data(), len1) == 0);
 
     // Serialize again and ensure the memory buffer is the same and being reused.
-    EXPECT_OK(serializer.Serialize(&counter, &len2, &buffer2));
+    EXPECT_OK(serializer.SerializeToBuffer(&counter, &len2, &buffer2));
 
     EXPECT_EQ(len1, len2);
     EXPECT_TRUE(buffer1 == buffer2);
@@ -63,6 +63,17 @@ TEST(ThriftUtil, SimpleSerializeDeserialize) {
     EXPECT_OK(DeserializeThriftMsg(buffer1, &len1, compact, &deserialized_counter));
     EXPECT_EQ(len1, len2);
     EXPECT_TRUE(counter == deserialized_counter);
+
+    // Serialize to string
+    std::string str;
+    EXPECT_OK(serializer.SerializeToString(&counter, &str));
+    EXPECT_EQ(len2, str.length());
+
+    // Verifies that deserialization of 'str' works.
+    TCounter deserialized_counter_2;
+    EXPECT_OK(DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(str.data()), &len2,
+        compact, &deserialized_counter_2));
+    EXPECT_TRUE(counter == deserialized_counter_2);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/rpc/thrift-util.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h
index ed95a71..bd15491 100644
--- a/be/src/rpc/thrift-util.h
+++ b/be/src/rpc/thrift-util.h
@@ -49,10 +49,10 @@ class ThriftSerializer {
 
   /// Serializes obj into result.  Result will contain a copy of the memory.
   template <class T>
-  Status Serialize(const T* obj, std::vector<uint8_t>* result) {
+  Status SerializeToVector(const T* obj, std::vector<uint8_t>* result) {
     uint32_t len;
     uint8_t* buffer;
-    RETURN_IF_ERROR(Serialize(obj, &len, &buffer));
+    RETURN_IF_ERROR(SerializeToBuffer(obj, &len, &buffer));
     result->assign(buffer, buffer + len);
     return Status::OK();
   }
@@ -61,7 +61,7 @@ class ThriftSerializer {
   /// memory returned is owned by this object and will be invalid when another object
   /// is serialized.
   template <class T>
-  Status Serialize(const T* obj, uint32_t* len, uint8_t** buffer) {
+  Status SerializeToBuffer(const T* obj, uint32_t* len, uint8_t** buffer) {
     try {
       mem_buffer_->resetBuffer();
       obj->write(protocol_.get());
@@ -75,7 +75,7 @@ class ThriftSerializer {
   }
 
   template <class T>
-  Status Serialize(const T* obj, std::string* result) {
+  Status SerializeToString(const T* obj, std::string* result) {
     try {
       mem_buffer_->resetBuffer();
       obj->write(protocol_.get());

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/backend-client.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index a0f27c7..977049a 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -55,16 +55,6 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient {
     ImpalaInternalServiceClient::recv_ExecQueryFInstances(_return);
   }
 
-  void ReportExecStatus(TReportExecStatusResult& _return,
-      const TReportExecStatusParams& params, bool* send_done) {
-    DCHECK(!*send_done);
-    FAULT_INJECTION_SEND_RPC_EXCEPTION(16);
-    ImpalaInternalServiceClient::send_ReportExecStatus(params);
-    *send_done = true;
-    FAULT_INJECTION_RECV_RPC_EXCEPTION(16);
-    ImpalaInternalServiceClient::recv_ReportExecStatus(_return);
-  }
-
   void CancelQueryFInstances(TCancelQueryFInstancesResult& _return,
       const TCancelQueryFInstancesParams& params, bool* send_done) {
     DCHECK(!*send_done);

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 9acab79..4d5ccbd 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -21,6 +21,7 @@
 
 #include "common/object-pool.h"
 #include "exec/exec-node.h"
+#include "exec/kudu-util.h"
 #include "exec/scan-node.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
@@ -28,9 +29,12 @@
 #include "runtime/client-cache.h"
 #include "runtime/backend-client.h"
 #include "runtime/coordinator-filter-state.h"
+#include "util/error-util-internal.h"
 #include "util/uid-util.h"
 #include "util/network-util.h"
 #include "util/counting-barrier.h"
+
+#include "gen-cpp/control_service.pb.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 
 #include "common/names.h"
@@ -265,37 +269,61 @@ inline bool Coordinator::BackendState::IsDone() const {
 }
 
 bool Coordinator::BackendState::ApplyExecStatusReport(
-    const TReportExecStatusParams& backend_exec_status, ExecSummary* exec_summary,
-    ProgressUpdater* scan_range_progress) {
+    const ReportExecStatusRequestPB& backend_exec_status,
+    const TRuntimeProfileTree& thrift_profile, ExecSummary* exec_summary,
+    ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state) {
+  // Hold the exec_summary's lock to avoid exposing it half-way through
+  // the update loop below.
   lock_guard<SpinLock> l1(exec_summary->lock);
-  lock_guard<mutex> l2(lock_);
+  unique_lock<mutex> lock(lock_);
   last_report_time_ms_ = MonotonicMillis();
 
   // If this backend completed previously, don't apply the update.
   if (IsDone()) return false;
-  for (const TFragmentInstanceExecStatus& instance_exec_status:
-      backend_exec_status.instance_exec_status) {
-    Status instance_status(instance_exec_status.status);
-    int instance_idx = GetInstanceIdx(instance_exec_status.fragment_instance_id);
+  for (const FragmentInstanceExecStatusPB& instance_exec_status :
+           backend_exec_status.instance_exec_status()) {
+    int64_t report_seq_no = instance_exec_status.report_seq_no();
+    int instance_idx = GetInstanceIdx(instance_exec_status.fragment_instance_id());
     DCHECK_EQ(instance_stats_map_.count(instance_idx), 1);
     InstanceStats* instance_stats = instance_stats_map_[instance_idx];
     DCHECK(instance_stats->exec_params_.instance_id ==
-        instance_exec_status.fragment_instance_id);
+        ProtoToQueryId(instance_exec_status.fragment_instance_id()));
     // Ignore duplicate or out-of-order messages.
-    if (instance_stats->done_) continue;
+    if (report_seq_no <= instance_stats->last_report_seq_no_) {
+      VLOG_QUERY << Substitute("Ignoring stale update for query instance $0 with "
+          "seq no $1", PrintId(instance_stats->exec_params_.instance_id), report_seq_no);
+      continue;
+    }
+
+    DCHECK(!instance_stats->done_);
+    instance_stats->Update(
+        instance_exec_status, thrift_profile, exec_summary, scan_range_progress);
+
+    // Update DML stats
+    if (instance_exec_status.has_dml_exec_status()) {
+      dml_exec_state->Update(instance_exec_status.dml_exec_status());
+    }
 
-    instance_stats->Update(instance_exec_status, exec_summary, scan_range_progress);
+    // Log messages aggregated by type
+    if (instance_exec_status.error_log_size() > 0) {
+      // Append the log messages from each update with the global state of the query
+      // execution
+      MergeErrorMaps(instance_exec_status.error_log(), &error_log_);
+      VLOG_FILE << "host=" << TNetworkAddressToString(host_) << " error log: " <<
+          PrintErrorMapToString(error_log_);
+    }
 
     // If a query is aborted due to an error encountered by a single fragment instance,
     // all other fragment instances will report a cancelled status; make sure not to mask
     // the original error status.
+    const Status instance_status(instance_exec_status.status());
     if (!instance_status.ok() && (status_.ok() || status_.IsCancelled())) {
       status_ = instance_status;
-      failed_instance_id_ = instance_exec_status.fragment_instance_id;
+      failed_instance_id_ = ProtoToQueryId(instance_exec_status.fragment_instance_id());
       is_fragment_failure_ = true;
     }
     DCHECK_GT(num_remaining_instances_, 0);
-    if (instance_exec_status.done) {
+    if (instance_exec_status.done()) {
       DCHECK(!instance_stats->done_);
       instance_stats->done_ = true;
       --num_remaining_instances_;
@@ -319,20 +347,11 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
   // status_ has incorporated the status from all fragment instances. If the overall
   // backend status is not OK, but no specific fragment instance reported an error, then
   // this is a general backend error. Incorporate the general error into status_.
-  Status overall_backend_status(backend_exec_status.status);
+  Status overall_backend_status(backend_exec_status.status());
   if (!overall_backend_status.ok() && (status_.ok() || status_.IsCancelled())) {
     status_ = overall_backend_status;
   }
 
-  // Log messages aggregated by type
-  if (backend_exec_status.__isset.error_log && backend_exec_status.error_log.size() > 0) {
-    // Append the log messages from each update with the global state of the query
-    // execution
-    MergeErrorMaps(backend_exec_status.error_log, &error_log_);
-    VLOG_FILE << "host=" << TNetworkAddressToString(host_) << " error log: " <<
-        PrintErrorMapToString(error_log_);
-  }
-
   // TODO: keep backend-wide stopwatch?
   return IsDone();
 }
@@ -463,15 +482,17 @@ void Coordinator::BackendState::InstanceStats::InitCounters() {
         p->GetCounter(ScanNode::BYTES_READ_COUNTER);
     if (bytes_read != nullptr) bytes_read_counters_.push_back(bytes_read);
   }
-
 }
 
 void Coordinator::BackendState::InstanceStats::Update(
-    const TFragmentInstanceExecStatus& exec_status, ExecSummary* exec_summary,
+    const FragmentInstanceExecStatusPB& exec_status,
+    const TRuntimeProfileTree& thrift_profile, ExecSummary* exec_summary,
     ProgressUpdater* scan_range_progress) {
   last_report_time_ms_ = MonotonicMillis();
-  if (exec_status.done) stopwatch_.Stop();
-  profile_->Update(exec_status.profile);
+  DCHECK_GT(exec_status.report_seq_no(), last_report_seq_no_);
+  last_report_seq_no_ = exec_status.report_seq_no();
+  if (exec_status.done()) stopwatch_.Stop();
+  profile_->Update(thrift_profile);
   if (!profile_created_) {
     profile_created_ = true;
     InitCounters();
@@ -482,7 +503,6 @@ void Coordinator::BackendState::InstanceStats::Update(
   // TODO: why do this every time we get an updated instance profile?
   vector<RuntimeProfile*> children;
   profile_->GetAllChildren(&children);
-
   TExecSummary& thrift_exec_summary = exec_summary->thrift_exec_summary;
   for (RuntimeProfile* child: children) {
     int node_id = ExecNode::GetNodeIdFromProfile(child);
@@ -493,14 +513,17 @@ void Coordinator::BackendState::InstanceStats::Update(
         thrift_exec_summary.nodes[exec_summary->node_id_to_idx_map[node_id]];
     int per_fragment_instance_idx = exec_params_.per_fragment_instance_idx;
     DCHECK_LT(per_fragment_instance_idx, node_exec_summary.exec_stats.size())
-        << " node_id=" << node_id << " instance_id=" << PrintId(exec_params_.instance_id)
+        << " node_id=" << node_id << " instance_id="
+        << PrintId(exec_params_.instance_id)
         << " fragment_idx=" << exec_params_.fragment().idx;
     TExecStats& instance_stats =
         node_exec_summary.exec_stats[per_fragment_instance_idx];
 
     RuntimeProfile::Counter* rows_counter = child->GetCounter("RowsReturned");
     RuntimeProfile::Counter* mem_counter = child->GetCounter("PeakMemoryUsage");
-    if (rows_counter != nullptr) instance_stats.__set_cardinality(rows_counter->value());
+    if (rows_counter != nullptr) {
+      instance_stats.__set_cardinality(rows_counter->value());
+    }
     if (mem_counter != nullptr) instance_stats.__set_memory_used(mem_counter->value());
     instance_stats.__set_latency_ns(child->local_time());
     // TODO: track interesting per-node metrics
@@ -515,7 +538,7 @@ void Coordinator::BackendState::InstanceStats::Update(
   scan_range_progress->Update(delta);
 
   // extract the current execution state of this instance
-  current_state_ = exec_status.current_state;
+  current_state_ = exec_status.current_state();
 }
 
 void Coordinator::BackendState::InstanceStats::ToJson(Value* value, Document* document) {

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 1154cd8..6bc4c67 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -32,9 +32,12 @@
 
 #include "runtime/coordinator.h"
 #include "scheduling/query-schedule.h"
+#include "util/error-util-internal.h"
 #include "util/progress-updater.h"
 #include "util/stopwatch.h"
 #include "util/runtime-profile.h"
+#include "gen-cpp/control_service.pb.h"
+#include "gen-cpp/RuntimeProfile_types.h"
 #include "gen-cpp/Types_types.h"
 
 namespace impala {
@@ -43,9 +46,9 @@ class ProgressUpdater;
 class ObjectPool;
 class DebugOptions;
 class CountingBarrier;
+class ReportExecStatusRequestPB;
 class TUniqueId;
 class TQueryCtx;
-class TReportExecStatusParams;
 class ExecSummary;
 struct FInstanceExecParams;
 
@@ -81,8 +84,9 @@ class Coordinator::BackendState {
   /// becomes the first reported error status. Returns true iff this update changed
   /// IsDone() from false to true, either because it was the last fragment to complete or
   /// because it was the first error received.
-  bool ApplyExecStatusReport(const TReportExecStatusParams& backend_exec_status,
-      ExecSummary* exec_summary, ProgressUpdater* scan_range_progress);
+  bool ApplyExecStatusReport(const ReportExecStatusRequestPB& backend_exec_status,
+      const TRuntimeProfileTree& thrift_profile, ExecSummary* exec_summary,
+      ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state);
 
   /// Update completion_times, rates, and avg_profile for all fragment_stats.
   void UpdateExecStats(const std::vector<FragmentStats*>& fragment_stats);
@@ -145,11 +149,12 @@ class Coordinator::BackendState {
     InstanceStats(const FInstanceExecParams& exec_params, FragmentStats* fragment_stats,
         ObjectPool* obj_pool);
 
-    /// Updates 'this' with exec_status, the fragment instances' TExecStats in
-    /// exec_summary, and 'progress_updater' with the number of newly completed scan
-    /// ranges. Also updates the instance's avg profile.
-    /// Caller must hold BackendState::lock_.
-    void Update(const TFragmentInstanceExecStatus& exec_status, ExecSummary* exec_summary,
+    /// Updates 'this' with exec_status and the fragment intance's thrift profile. Also
+    /// updates the fragment instance's TExecStats in exec_summary and 'progress_updater'
+    /// with the number of newly completed scan ranges. Also updates the instance's avg
+    /// profile. Caller must hold BackendState::lock_.
+    void Update(const FragmentInstanceExecStatusPB& exec_status,
+        const TRuntimeProfileTree& thrift_profile, ExecSummary* exec_summary,
         ProgressUpdater* scan_range_progress);
 
     int per_fragment_instance_idx() const {
@@ -170,6 +175,9 @@ class Coordinator::BackendState {
     /// Set in Update(). Uses MonotonicMillis().
     int64_t last_report_time_ms_ = 0;
 
+    /// The sequence number of the last report.
+    int64_t last_report_seq_no_ = 0;
+
     /// owned by coordinator object pool provided in the c'tor, created in Update()
     RuntimeProfile* profile_ = nullptr;
 
@@ -198,7 +206,7 @@ class Coordinator::BackendState {
 
     /// The current state of this fragment instance's execution. This gets serialized in
     /// ToJson() and is displayed in the debug webpages.
-    TFInstanceExecState::type current_state_ = TFInstanceExecState::WAITING_FOR_EXEC;
+    FInstanceExecStatePB current_state_ = FInstanceExecStatePB::WAITING_FOR_EXEC;
 
     /// Extracts scan_ranges_complete_counters_ and  bytes_read_counters_ from profile_.
     void InitCounters();

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index eb9fe81..47be2e2 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -660,23 +660,21 @@ void Coordinator::CancelBackends() {
       PrintId(query_id()), num_cancelled);
 }
 
-Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& params) {
+Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& request,
+    const TRuntimeProfileTree& thrift_profile) {
+  const int32_t coord_state_idx = request.coord_state_idx();
   VLOG_FILE << "UpdateBackendExecStatus() query_id=" << PrintId(query_id())
-            << " backend_idx=" << params.coord_state_idx;
-  if (params.coord_state_idx >= backend_states_.size()) {
+            << " backend_idx=" << coord_state_idx;
+
+  if (coord_state_idx >= backend_states_.size()) {
     return Status(TErrorCode::INTERNAL_ERROR,
         Substitute("Unknown backend index $0 (max known: $1)",
-            params.coord_state_idx, backend_states_.size() - 1));
-  }
-  BackendState* backend_state = backend_states_[params.coord_state_idx];
-
-  // TODO: only do this when the sink is done; probably missing a done field
-  // in TReportExecStatus for that
-  if (params.__isset.insert_exec_status) {
-    dml_exec_state_.Update(params.insert_exec_status);
+            coord_state_idx, backend_states_.size() - 1));
   }
+  BackendState* backend_state = backend_states_[coord_state_idx];
 
-  if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) {
+  if (backend_state->ApplyExecStatusReport(request, thrift_profile, &exec_summary_,
+          &progress_, &dml_exec_state_)) {
     // This backend execution has completed.
     if (VLOG_QUERY_IS_ON) {
       // Don't log backend completion if the query has already been cancelled.

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 4fb7e25..a3172ba 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -37,20 +37,19 @@
 namespace impala {
 
 class CountingBarrier;
+class FragmentInstanceState;
+class MemTracker;
 class ObjectPool;
-class RuntimeState;
-class TUpdateCatalogRequest;
-class TReportExecStatusParams;
-class TPlanExecRequest;
-class TRuntimeProfileTree;
-class RuntimeProfile;
+class PlanRootSink;
 class QueryResultSet;
 class QuerySchedule;
-class MemTracker;
-class PlanRootSink;
-class FragmentInstanceState;
 class QueryState;
-
+class ReportExecStatusRequestPB;
+class RuntimeProfile;
+class RuntimeState;
+class TPlanExecRequest;
+class TRuntimeProfileTree;
+class TUpdateCatalogRequest;
 
 /// Query coordinator: handles execution of fragment instances on remote nodes, given a
 /// TQueryExecRequest. As part of that, it handles all interactions with the executing
@@ -91,8 +90,8 @@ class QueryState;
 /// Lock ordering: (lower-numbered acquired before higher-numbered)
 /// 1. wait_lock_
 /// 2. filter_lock_
-/// 3. exec_state_lock_, backend_states_init_lock_, filter_update_lock_,
-///    ExecSummary::lock (leafs)
+/// 3. exec_state_lock_, backend_states_init_lock_, filter_update_lock_, ExecSummary::lock
+/// 4. Coordinator::BackendState::lock_ (leafs)
 ///
 /// TODO: move into separate subdirectory and move nested classes into separate files
 /// and unnest them
@@ -127,11 +126,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// query is still executing. Idempotent.
   void Cancel();
 
-  /// Called by the report status RPC handler to update execution status of a
-  /// particular backend as well as dml_exec_state_ and the profile. This may block if
-  /// exec RPCs are pending.
-  Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
-      WARN_UNUSED_RESULT;
+  /// Called by the report status RPC handler to update execution status of a particular
+  /// backend as well as dml_exec_state_ and the profile. This may block if exec RPCs are
+  /// pending. 'request' contains details of the status update. 'thrift_profile' is the
+  /// Thrift runtime profile from the backend.
+  Status UpdateBackendExecStatus(const ReportExecStatusRequestPB& request,
+      const TRuntimeProfileTree& thrift_profile) WARN_UNUSED_RESULT;
 
   /// Get cumulative profile aggregated over all fragments of the query.
   /// This is a snapshot of the current state of execution and will change in
@@ -148,7 +148,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// individual fragment instances are merged into a single output to retain readability.
   std::string GetErrorLog();
 
-  const ProgressUpdater& progress() { return progress_; }
+  const ProgressUpdater& progress() const { return progress_; }
 
   /// Get a copy of the current exec summary. Thread-safe.
   void GetTExecSummary(TExecSummary* exec_summary);

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/dml-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
index 6853da5..5ab1bce 100644
--- a/be/src/runtime/dml-exec-state.cc
+++ b/be/src/runtime/dml-exec-state.cc
@@ -25,6 +25,7 @@
 #include <gutil/strings/substitute.h>
 
 #include "common/logging.h"
+#include "exec/data-sink.h"
 #include "util/pretty-printer.h"
 #include "util/container-util.h"
 #include "util/hdfs-bulk-ops.h"
@@ -33,6 +34,7 @@
 #include "runtime/descriptors.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/exec-env.h"
+#include "gen-cpp/control_service.pb.h"
 #include "gen-cpp/ImpalaService_types.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
@@ -47,6 +49,9 @@ using namespace impala;
 using boost::algorithm::is_any_of;
 using boost::algorithm::split;
 
+typedef google::protobuf::Map<string, int64> PerColumnSizePBMap;
+typedef google::protobuf::Map<string, DmlPartitionStatusPB> PerPartitionStatusPBMap;
+
 string DmlExecState::OutputPartitionStats(const string& prefix) {
   lock_guard<mutex> l(lock_);
   const char* indent = "  ";
@@ -58,55 +63,53 @@ string DmlExecState::OutputPartitionStats(const string& prefix) {
     first = false;
     ss << "Partition: ";
     const string& partition_key = val.first;
-    if (partition_key == g_ImpalaInternalService_constants.ROOT_PARTITION_KEY) {
+    if (partition_key == DataSink::ROOT_PARTITION_KEY) {
       ss << "Default" << endl;
     } else {
       ss << partition_key << endl;
     }
-    if (val.second.__isset.num_modified_rows) {
-      ss << "NumModifiedRows: " << val.second.num_modified_rows << endl;
+    if (val.second.has_num_modified_rows()) {
+      ss << "NumModifiedRows: " << val.second.num_modified_rows() << endl;
     }
 
-    if (!val.second.__isset.stats) continue;
-    const TInsertStats& stats = val.second.stats;
-    if (stats.__isset.kudu_stats) {
-      ss << "NumRowErrors: " << stats.kudu_stats.num_row_errors << endl;
+    if (!val.second.has_stats()) continue;
+    const DmlStatsPB& stats = val.second.stats();
+    if (stats.has_kudu_stats()) {
+      ss << "NumRowErrors: " << stats.kudu_stats().num_row_errors() << endl;
     }
 
     ss << indent << "BytesWritten: "
-       << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES);
-    if (stats.__isset.parquet_stats) {
-      const TParquetInsertStats& parquet_stats = stats.parquet_stats;
+       << PrettyPrinter::Print(stats.bytes_written(), TUnit::BYTES);
+    if (stats.has_parquet_stats()) {
+      const ParquetDmlStatsPB& parquet_stats = stats.parquet_stats();
       ss << endl << indent << "Per Column Sizes:";
-      for (map<string, int64_t>::const_iterator i = parquet_stats.per_column_size.begin();
-           i != parquet_stats.per_column_size.end(); ++i) {
-        ss << endl << indent << indent << i->first << ": "
-           << PrettyPrinter::Print(i->second, TUnit::BYTES);
+      for (const PerColumnSizePBMap::value_type& i : parquet_stats.per_column_size()) {
+        ss << endl << indent << indent << i.first << ": "
+           << PrettyPrinter::Print(i.second, TUnit::BYTES);
       }
     }
   }
   return ss.str();
 }
 
-void DmlExecState::Update(const TInsertExecStatus& dml_exec_status) {
+void DmlExecState::Update(const DmlExecStatusPB& dml_exec_status) {
   lock_guard<mutex> l(lock_);
-  for (const PartitionStatusMap::value_type& partition:
-           dml_exec_status.per_partition_status) {
-    TInsertPartitionStatus* status = &(per_partition_status_[partition.first]);
-    status->__set_num_modified_rows(
-        status->num_modified_rows + partition.second.num_modified_rows);
-    status->__set_kudu_latest_observed_ts(max<uint64_t>(
-            partition.second.kudu_latest_observed_ts, status->kudu_latest_observed_ts));
-    status->__set_id(partition.second.id);
-    status->__set_partition_base_dir(partition.second.partition_base_dir);
-
-    if (partition.second.__isset.stats) {
-      if (!status->__isset.stats) status->__set_stats(TInsertStats());
-      MergeDmlStats(partition.second.stats, &status->stats);
+  const PerPartitionStatusPBMap& new_partition_status_map =
+      dml_exec_status.per_partition_status();
+  for (const PerPartitionStatusPBMap::value_type& part : new_partition_status_map) {
+    DmlPartitionStatusPB* status = &(per_partition_status_[part.first]);
+    status->set_num_modified_rows(
+        status->num_modified_rows() + part.second.num_modified_rows());
+    status->set_kudu_latest_observed_ts(max<uint64_t>(
+        part.second.kudu_latest_observed_ts(), status->kudu_latest_observed_ts()));
+    status->set_id(part.second.id());
+    status->set_partition_base_dir(part.second.partition_base_dir());
+    if (part.second.has_stats()) {
+      MergeDmlStats(part.second.stats(), status->mutable_stats());
     }
   }
   files_to_move_.insert(
-      dml_exec_status.files_to_move.begin(), dml_exec_status.files_to_move.end());
+      dml_exec_status.files_to_move().begin(), dml_exec_status.files_to_move().end());
 }
 
 void DmlExecState::AddFileToMove(const string& file_name, const string& location) {
@@ -117,8 +120,8 @@ void DmlExecState::AddFileToMove(const string& file_name, const string& location
 uint64_t DmlExecState::GetKuduLatestObservedTimestamp() {
   lock_guard<mutex> l(lock_);
   uint64_t max_ts = 0;
-  for (const auto& entry : per_partition_status_) {
-    max_ts = max<uint64_t>(max_ts, entry.second.kudu_latest_observed_ts);
+  for (const PartitionStatusMap::value_type& p : per_partition_status_) {
+    max_ts = max<uint64_t>(max_ts, p.second.kudu_latest_observed_ts());
   }
   return max_ts;
 }
@@ -126,15 +129,15 @@ uint64_t DmlExecState::GetKuduLatestObservedTimestamp() {
 int64_t DmlExecState::GetNumModifiedRows() {
   lock_guard<mutex> l(lock_);
   int64_t result = 0;
-  for (const PartitionStatusMap::value_type& p: per_partition_status_) {
-    result += p.second.num_modified_rows;
+  for (const PartitionStatusMap::value_type& p : per_partition_status_) {
+    result += p.second.num_modified_rows();
   }
   return result;
 }
 
 bool DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
   lock_guard<mutex> l(lock_);
-  for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
+  for (const PartitionStatusMap::value_type& partition : per_partition_status_) {
     catalog_update->created_partitions.insert(partition.first);
   }
   return catalog_update->created_partitions.size() != 0;
@@ -154,7 +157,7 @@ Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
 
   // Loop over all partitions that were updated by this insert, and create the set of
   // filesystem operations required to create the correct partition structure on disk.
-  for (const PartitionStatusMap::value_type& partition: per_partition_status_) {
+  for (const PartitionStatusMap::value_type& partition : per_partition_status_) {
     SCOPED_TIMER(ADD_CHILD_TIMER(profile, "Overwrite/PartitionCreationTimer",
             "FinalizationTimer"));
     // INSERT allows writes to tables that have partitions on multiple filesystems.
@@ -163,19 +166,19 @@ Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
     // partitions are on.
     hdfsFS partition_fs_connection;
     RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
-            partition.second.partition_base_dir, &partition_fs_connection,
-            &filesystem_connection_cache));
+        partition.second.partition_base_dir(), &partition_fs_connection,
+        &filesystem_connection_cache));
 
     // Look up the partition in the descriptor table.
     stringstream part_path_ss;
-    if (partition.second.id == -1) {
+    if (partition.second.id() == -1) {
       // If this is a non-existant partition, use the default partition location of
       // <base_dir>/part_key_1=val/part_key_2=val/...
       part_path_ss << params.hdfs_base_dir << "/" << partition.first;
     } else {
-      HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id);
+      HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id());
       DCHECK(part != nullptr)
-          << "table_id=" << hdfs_table->id() << " partition_id=" << partition.second.id;
+          << "table_id=" << hdfs_table->id() << " partition_id=" << partition.second.id();
       part_path_ss << part->location();
     }
     const string& part_path = part_path_ss.str();
@@ -259,7 +262,7 @@ Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
             "FinalizationTimer"));
     if (!partition_create_ops.Execute(
             ExecEnv::GetInstance()->hdfs_op_thread_pool(), false)) {
-      for (const HdfsOperationSet::Error& err: partition_create_ops.errors()) {
+      for (const HdfsOperationSet::Error& err : partition_create_ops.errors()) {
         // It's ok to ignore errors creating the directories, since they may already
         // exist. If there are permission errors, we'll run into them later.
         if (err.first->op() != CREATE_DIR) {
@@ -275,7 +278,7 @@ Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
   HdfsOperationSet move_ops(&filesystem_connection_cache);
   HdfsOperationSet dir_deletion_ops(&filesystem_connection_cache);
 
-  for (FileMoveMap::value_type& move: files_to_move_) {
+  for (const FileMoveMap::value_type& move : files_to_move_) {
     // Empty destination means delete, so this is a directory. These get deleted in a
     // separate pass to ensure that we have moved all the contents of the directory first.
     if (move.second.empty()) {
@@ -317,7 +320,7 @@ Status DmlExecState::FinalizeHdfsInsert(const TFinalizeParams& params,
   // Do this last so that we don't make a dir unwritable before we write to it.
   if (FLAGS_insert_inherit_permissions) {
     HdfsOperationSet chmod_ops(&filesystem_connection_cache);
-    for (const PermissionCache::value_type& perm: permissions_cache) {
+    for (const PermissionCache::value_type& perm : permissions_cache) {
       bool new_dir = perm.second.first;
       if (new_dir) {
         short permissions = perm.second.second;
@@ -359,7 +362,7 @@ void DmlExecState::PopulatePathPermissionCache(hdfsFS fs, const string& path_str
   vector<string> prefixes;
   // Stores the current prefix
   stringstream accumulator;
-  for (const string& component: components) {
+  for (const string& component : components) {
     if (component.empty()) continue;
     accumulator << "/" << component;
     prefixes.push_back(accumulator.str());
@@ -377,7 +380,7 @@ void DmlExecState::PopulatePathPermissionCache(hdfsFS fs, const string& path_str
   // Set to the permission of the immediate parent (i.e. the permissions to inherit if the
   // current dir doesn't exist).
   short permissions = 0;
-  for (const string& path: prefixes) {
+  for (const string& path : prefixes) {
     PermissionCache::const_iterator it = permissions_cache->find(path);
     if (it == permissions_cache->end()) {
       hdfsFileInfo* info = hdfsGetPathInfo(fs, path.c_str());
@@ -398,18 +401,15 @@ void DmlExecState::PopulatePathPermissionCache(hdfsFS fs, const string& path_str
   }
 }
 
-bool DmlExecState::ToThrift(TInsertExecStatus* dml_status) {
+void DmlExecState::ToProto(DmlExecStatusPB* dml_status) {
+  dml_status->Clear();
   lock_guard<mutex> l(lock_);
-  bool set_thrift = false;
-  if (files_to_move_.size() > 0) {
-    dml_status->__set_files_to_move(files_to_move_);
-    set_thrift = true;
+  for (const FileMoveMap::value_type& file : files_to_move_) {
+    (*dml_status->mutable_files_to_move())[file.first] = file.second;
   }
-  if (per_partition_status_.size() > 0) {
-    dml_status->__set_per_partition_status(per_partition_status_);
-    set_thrift = true;
+  for (const PartitionStatusMap::value_type& part : per_partition_status_) {
+    (*dml_status->mutable_per_partition_status())[part.first] = part.second;
   }
-  return set_thrift;
 }
 
 void DmlExecState::ToTInsertResult(TInsertResult* insert_result) {
@@ -417,11 +417,11 @@ void DmlExecState::ToTInsertResult(TInsertResult* insert_result) {
   int64_t num_row_errors = 0;
   bool has_kudu_stats = false;
   for (const PartitionStatusMap::value_type& v: per_partition_status_) {
-    insert_result->rows_modified[v.first] = v.second.num_modified_rows;
-    if (v.second.__isset.stats && v.second.stats.__isset.kudu_stats) {
+    insert_result->rows_modified[v.first] = v.second.num_modified_rows();
+    if (v.second.has_stats() && v.second.stats().has_kudu_stats()) {
       has_kudu_stats = true;
     }
-    num_row_errors += v.second.stats.kudu_stats.num_row_errors;
+    num_row_errors += v.second.stats().kudu_stats().num_row_errors();
   }
   if (has_kudu_stats) insert_result->__set_num_row_errors(num_row_errors);
 }
@@ -430,65 +430,64 @@ void DmlExecState::AddPartition(
     const string& name, int64_t id, const string* base_dir) {
   lock_guard<mutex> l(lock_);
   DCHECK(per_partition_status_.find(name) == per_partition_status_.end());
-  TInsertPartitionStatus status;
-  status.__set_num_modified_rows(0L);
-  status.__set_id(id);
-  status.__isset.stats = true;
-  if (base_dir != nullptr) status.__set_partition_base_dir(*base_dir);
+  DmlPartitionStatusPB status;
+  status.set_num_modified_rows(0L);
+  status.set_id(id);
+  status.mutable_stats()->set_bytes_written(0L);
+  status.set_partition_base_dir(base_dir != nullptr ? *base_dir : "");
   per_partition_status_.insert(make_pair(name, status));
 }
 
 void DmlExecState::UpdatePartition(const string& partition_name,
-    int64_t num_modified_rows_delta, const TInsertStats* insert_stats) {
+    int64_t num_modified_rows_delta, const DmlStatsPB* insert_stats) {
   lock_guard<mutex> l(lock_);
   PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name);
   DCHECK(entry != per_partition_status_.end());
-  entry->second.num_modified_rows += num_modified_rows_delta;
+  entry->second.set_num_modified_rows(
+      entry->second.num_modified_rows() + num_modified_rows_delta);
   if (insert_stats == nullptr) return;
-  MergeDmlStats(*insert_stats, &entry->second.stats);
+  MergeDmlStats(*insert_stats, entry->second.mutable_stats());
 }
 
-void DmlExecState::MergeDmlStats(const TInsertStats& src, TInsertStats* dst) {
-  dst->bytes_written += src.bytes_written;
-  if (src.__isset.kudu_stats) {
-    dst->__isset.kudu_stats = true;
-    if (!dst->kudu_stats.__isset.num_row_errors) {
-      dst->kudu_stats.__set_num_row_errors(0);
-    }
-    dst->kudu_stats.__set_num_row_errors(
-        dst->kudu_stats.num_row_errors + src.kudu_stats.num_row_errors);
+void DmlExecState::MergeDmlStats(const DmlStatsPB& src, DmlStatsPB* dst) {
+  dst->set_bytes_written(dst->bytes_written() + src.bytes_written());
+  if (src.has_kudu_stats()) {
+    KuduDmlStatsPB* kudu_stats = dst->mutable_kudu_stats();
+    kudu_stats->set_num_row_errors(
+        kudu_stats->num_row_errors() + src.kudu_stats().num_row_errors());
   }
-  if (src.__isset.parquet_stats) {
-    if (dst->__isset.parquet_stats) {
-      MergeMapValues<string, int64_t>(src.parquet_stats.per_column_size,
-          &dst->parquet_stats.per_column_size);
+  if (src.has_parquet_stats()) {
+    if (dst->has_parquet_stats()) {
+      MergeMapValues(src.parquet_stats().per_column_size(),
+          dst->mutable_parquet_stats()->mutable_per_column_size());
     } else {
-      dst->__set_parquet_stats(src.parquet_stats);
+      *dst->mutable_parquet_stats() = src.parquet_stats();
     }
   }
 }
 
 void DmlExecState::InitForKuduDml() {
   // For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY.
-  const string& partition_name = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
+  const string& partition_name = DataSink::ROOT_PARTITION_KEY;
   lock_guard<mutex> l(lock_);
   DCHECK(per_partition_status_.find(partition_name) == per_partition_status_.end());
-  TInsertPartitionStatus status;
-  status.__set_num_modified_rows(0L);
-  status.__set_id(-1L);
-  status.__isset.stats = true;
-  status.stats.__isset.kudu_stats = true;
+  DmlPartitionStatusPB status;
+  status.set_id(-1L);
+  status.set_num_modified_rows(0L);
+  status.mutable_stats()->set_bytes_written(0L);
+  status.mutable_stats()->mutable_kudu_stats()->set_num_row_errors(0L);
+  status.set_partition_base_dir("");
   per_partition_status_.insert(make_pair(partition_name, status));
 }
 
 void DmlExecState::SetKuduDmlStats(int64_t num_modified_rows, int64_t num_row_errors,
     int64_t latest_ts) {
   // For Kudu, track only one set of DML stats, so use the ROOT_PARTITION_KEY.
-  const string& partition_name = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
+  const string& partition_name = DataSink::ROOT_PARTITION_KEY;
   lock_guard<mutex> l(lock_);
   PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name);
   DCHECK(entry != per_partition_status_.end());
-  entry->second.__set_num_modified_rows(num_modified_rows);
-  entry->second.stats.kudu_stats.__set_num_row_errors(num_row_errors);
-  entry->second.__set_kudu_latest_observed_ts(latest_ts);
+  entry->second.set_num_modified_rows(num_modified_rows);
+  entry->second.mutable_stats()->mutable_kudu_stats()->set_num_row_errors(num_row_errors);
+  entry->second.set_kudu_latest_observed_ts(latest_ts);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/dml-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h
index 728284a..5872b85 100644
--- a/be/src/runtime/dml-exec-state.h
+++ b/be/src/runtime/dml-exec-state.h
@@ -29,12 +29,12 @@
 
 namespace impala {
 
-class TInsertExecStatus;
+class DmlExecStatusPB;
+class DmlPartitionStatusPB;
+class DmlStatsPB;
 class TInsertResult;
-class TInsertStats;
 class TFinalizeParams;
 class TUpdateCatalogRequest;
-class TInsertPartitionStatus;
 class RuntimeProfile;
 class HdfsTableDescriptor;
 
@@ -57,7 +57,7 @@ class HdfsTableDescriptor;
 class DmlExecState {
  public:
   /// Merge values from 'dml_exec_status'.
-  void Update(const TInsertExecStatus& dml_exec_status);
+  void Update(const DmlExecStatusPB& dml_exec_status);
 
   /// Add a new partition with the given parameters. Ignores 'base_dir' if nullptr.
   /// It is an error to call this for an existing partition.
@@ -67,7 +67,7 @@ class DmlExecState {
   /// Ignores 'insert_stats' if nullptr.
   /// Requires that the partition already exist.
   void UpdatePartition(const std::string& partition_name,
-      int64_t num_modified_rows_delta, const TInsertStats* insert_stats);
+      int64_t num_modified_rows_delta, const DmlStatsPB* insert_stats);
 
   /// Used to initialize this state when execute Kudu DML. Must be called before
   /// SetKuduDmlStats().
@@ -102,22 +102,22 @@ class DmlExecState {
   Status FinalizeHdfsInsert(const TFinalizeParams& params, bool s3_skip_insert_staging,
       HdfsTableDescriptor* hdfs_table, RuntimeProfile* profile) WARN_UNUSED_RESULT;
 
-  // Serialize to thrift. Returns true if any fields of 'dml_status' were set.
-  bool ToThrift(TInsertExecStatus* dml_status);
+  /// Serialize to protobuf and stores the result in 'dml_status'.
+  void ToProto(DmlExecStatusPB* dml_status);
 
-  // Populates 'insert_result' with PartitionStatusMap data, for Impala's extension of
-  // Beeswax.
+  /// Populates 'insert_result' with PartitionStatusMap data, for Impala's extension of
+  /// Beeswax.
   void ToTInsertResult(TInsertResult* insert_result);
 
  private:
-  // protects all fields below
+  /// protects all fields below
   boost::mutex lock_;
 
   /// Counts how many rows an DML query has added to a particular partition (partitions
   /// are identified by their partition keys: k1=v1/k2=v2 etc. Unpartitioned tables
   /// have a single 'default' partition which is identified by ROOT_PARTITION_KEY.
   /// Uses ordered map so that iteration order is deterministic.
-  typedef std::map<std::string, TInsertPartitionStatus> PartitionStatusMap;
+  typedef std::map<std::string, DmlPartitionStatusPB> PartitionStatusMap;
   PartitionStatusMap per_partition_status_;
 
   /// Tracks files to move from a temporary (key) to a final destination (value) as
@@ -141,7 +141,7 @@ class DmlExecState {
       PermissionCache* permissions_cache);
 
   /// Merge 'src' into 'dst'. Not thread-safe.
-  void MergeDmlStats(const TInsertStats& src, TInsertStats* dst);
+  void MergeDmlStats(const DmlStatsPB& src, DmlStatsPB* dst);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 5e15990..967e07d 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -47,6 +47,7 @@
 #include "scheduling/admission-controller.h"
 #include "scheduling/request-pool-service.h"
 #include "scheduling/scheduler.h"
+#include "service/control-service.h"
 #include "service/data-stream-service.h"
 #include "service/frontend.h"
 #include "service/impala-server.h"
@@ -307,11 +308,13 @@ Status ExecEnv::Init() {
   obj_pool_->Add(new MemTracker(negated_unused_reservation, -1,
       "Buffer Pool: Unused Reservation", mem_tracker_.get()));
 
-  // Initializes the RPCMgr and DataStreamServices.
+  // Initializes the RPCMgr, ControlServices and DataStreamServices.
   krpc_address_.__set_hostname(ip_address_);
   // Initialization needs to happen in the following order due to dependencies:
   // - RPC manager, DataStreamService and DataStreamManager.
   RETURN_IF_ERROR(rpc_mgr_->Init());
+  control_svc_.reset(new ControlService(rpc_metrics_));
+  RETURN_IF_ERROR(control_svc_->Init());
   data_svc_.reset(new DataStreamService(rpc_metrics_));
   RETURN_IF_ERROR(data_svc_->Init());
   RETURN_IF_ERROR(stream_mgr_->Init(data_svc_->mem_tracker()));

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 3832d0d..960d1ab 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -41,6 +41,8 @@ namespace impala {
 class AdmissionController;
 class BufferPool;
 class CallableThreadPool;
+class ControlService;
+class DataStreamMgr;
 class DataStreamService;
 class QueryExecMgr;
 class Frontend;
@@ -131,7 +133,6 @@ class ExecEnv {
   CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); }
   QueryExecMgr* query_exec_mgr() { return query_exec_mgr_.get(); }
   RpcMgr* rpc_mgr() const { return rpc_mgr_.get(); }
-  DataStreamService* data_svc() const { return data_svc_.get(); }
   PoolMemTrackerRegistry* pool_mem_trackers() { return pool_mem_trackers_.get(); }
   ReservationTracker* buffer_reservation() { return buffer_reservation_.get(); }
   BufferPool* buffer_pool() { return buffer_pool_.get(); }
@@ -195,6 +196,7 @@ class ExecEnv {
   boost::scoped_ptr<CallableThreadPool> async_rpc_pool_;
   boost::scoped_ptr<QueryExecMgr> query_exec_mgr_;
   boost::scoped_ptr<RpcMgr> rpc_mgr_;
+  boost::scoped_ptr<ControlService> control_svc_;
   boost::scoped_ptr<DataStreamService> data_svc_;
 
   /// Query-wide buffer pool and the root reservation tracker for the pool. The

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 700a391..d2e8359 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -49,7 +49,8 @@
 #include "util/periodic-counter-updater.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 
-DEFINE_int32(status_report_interval, 5, "interval between profile reports; in seconds");
+DEFINE_int32(status_report_interval_ms, 5000,
+    "interval between profile reports; in milliseconds");
 
 using namespace impala;
 using namespace apache::thrift;
@@ -103,11 +104,11 @@ Status FragmentInstanceState::Exec() {
 done:
   if (!status.ok()) {
     if (!is_prepared) {
-      DCHECK_LE(current_state_.Load(), TFInstanceExecState::WAITING_FOR_PREPARE);
+      DCHECK_LE(current_state_.Load(), FInstanceExecStatePB::WAITING_FOR_PREPARE);
       // Tell the managing 'QueryState' that we hit an error during Prepare().
       query_state_->ErrorDuringPrepare(status, instance_id());
     } else {
-      DCHECK_GT(current_state_.Load(), TFInstanceExecState::WAITING_FOR_PREPARE);
+      DCHECK_GT(current_state_.Load(), FInstanceExecStatePB::WAITING_FOR_PREPARE);
       // Tell the managing 'QueryState' that we hit an error during execution.
       query_state_->ErrorDuringExecute(status, instance_id());
     }
@@ -127,7 +128,7 @@ void FragmentInstanceState::Cancel() {
 }
 
 Status FragmentInstanceState::Prepare() {
-  DCHECK_EQ(current_state_.Load(), TFInstanceExecState::WAITING_FOR_EXEC);
+  DCHECK_EQ(current_state_.Load(), FInstanceExecStatePB::WAITING_FOR_EXEC);
   VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx_);
 
   // Do not call RETURN_IF_ERROR or explicitly return before this line,
@@ -239,7 +240,7 @@ Status FragmentInstanceState::Prepare() {
 
   // We need to start the profile-reporting thread before calling Open(),
   // since it may block.
-  if (FLAGS_status_report_interval > 0) {
+  if (FLAGS_status_report_interval_ms > 0) {
     string thread_name = Substitute("profile-report (finst:$0)", PrintId(instance_id()));
     unique_lock<mutex> l(report_thread_lock_);
     RETURN_IF_ERROR(Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME,
@@ -254,7 +255,7 @@ Status FragmentInstanceState::Prepare() {
 
 Status FragmentInstanceState::Open() {
   DCHECK(!opened_promise_.IsSet());
-  DCHECK_EQ(current_state_.Load(), TFInstanceExecState::WAITING_FOR_PREPARE);
+  DCHECK_EQ(current_state_.Load(), FInstanceExecStatePB::WAITING_FOR_PREPARE);
   SCOPED_TIMER(profile()->total_time_counter());
   SCOPED_TIMER(ADD_TIMER(timings_profile_, OPEN_TIMER_NAME));
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
@@ -293,7 +294,7 @@ Status FragmentInstanceState::Open() {
 }
 
 Status FragmentInstanceState::ExecInternal() {
-  DCHECK_EQ(current_state_.Load(), TFInstanceExecState::WAITING_FOR_OPEN);
+  DCHECK_EQ(current_state_.Load(), FInstanceExecStatePB::WAITING_FOR_OPEN);
   // Inject failure if debug actions are enabled.
   RETURN_IF_ERROR(DebugAction(query_state_->query_options(), "FIS_IN_EXEC_INTERNAL"));
 
@@ -372,9 +373,9 @@ void FragmentInstanceState::ReportProfileThread() {
   // 0 and the report_interval.  This way, the coordinator doesn't get all the
   // updates at once so its better for contention as well as smoother progress
   // reporting.
-  int report_fragment_offset = rand() % FLAGS_status_report_interval;
+  int report_fragment_offset = rand() % FLAGS_status_report_interval_ms;
   // We don't want to wait longer than it takes to run the entire fragment.
-  stop_report_thread_cv_.WaitFor(l, report_fragment_offset * MICROS_PER_SEC);
+  stop_report_thread_cv_.WaitFor(l, report_fragment_offset * MICROS_PER_MILLI);
 
   while (report_thread_active_) {
     // timed_wait can return because the timeout occurred or the condition variable
@@ -382,7 +383,7 @@ void FragmentInstanceState::ReportProfileThread() {
     // two cases (e.g. there is a race here where the wait timed out but before grabbing
     // the lock, the condition variable was signaled).  Instead, we will use an external
     // flag, report_thread_active_, to coordinate this.
-    stop_report_thread_cv_.WaitFor(l, FLAGS_status_report_interval * MICROS_PER_SEC);
+    stop_report_thread_cv_.WaitFor(l, FLAGS_status_report_interval_ms * MICROS_PER_MILLI);
 
     if (!report_thread_active_) break;
     SendReport(false, Status::OK());
@@ -392,6 +393,7 @@ void FragmentInstanceState::ReportProfileThread() {
 }
 
 void FragmentInstanceState::SendReport(bool done, const Status& status) {
+  DFAKE_SCOPED_LOCK(report_status_lock_);
   DCHECK(status.ok() || done);
   DCHECK(runtime_state_ != nullptr);
 
@@ -408,62 +410,62 @@ void FragmentInstanceState::SendReport(bool done, const Status& status) {
 
 void FragmentInstanceState::UpdateState(const StateEvent event)
 {
-  TFInstanceExecState::type current_state = current_state_.Load();
-  TFInstanceExecState::type next_state = current_state;
+  FInstanceExecStatePB current_state = current_state_.Load();
+  FInstanceExecStatePB next_state = current_state;
   switch (event) {
     case StateEvent::PREPARE_START:
-      DCHECK_EQ(current_state, TFInstanceExecState::WAITING_FOR_EXEC);
-      next_state = TFInstanceExecState::WAITING_FOR_PREPARE;
+      DCHECK_EQ(current_state, FInstanceExecStatePB::WAITING_FOR_EXEC);
+      next_state = FInstanceExecStatePB::WAITING_FOR_PREPARE;
       break;
 
     case StateEvent::CODEGEN_START:
-      DCHECK_EQ(current_state, TFInstanceExecState::WAITING_FOR_PREPARE);
+      DCHECK_EQ(current_state, FInstanceExecStatePB::WAITING_FOR_PREPARE);
       event_sequence_->MarkEvent("Prepare Finished");
-      next_state = TFInstanceExecState::WAITING_FOR_CODEGEN;
+      next_state = FInstanceExecStatePB::WAITING_FOR_CODEGEN;
       break;
 
     case StateEvent::OPEN_START:
-      if (current_state == TFInstanceExecState::WAITING_FOR_PREPARE) {
+      if (current_state == FInstanceExecStatePB::WAITING_FOR_PREPARE) {
         event_sequence_->MarkEvent("Prepare Finished");
       } else {
-        DCHECK_EQ(current_state, TFInstanceExecState::WAITING_FOR_CODEGEN);
+        DCHECK_EQ(current_state, FInstanceExecStatePB::WAITING_FOR_CODEGEN);
       }
-      next_state = TFInstanceExecState::WAITING_FOR_OPEN;
+      next_state = FInstanceExecStatePB::WAITING_FOR_OPEN;
       break;
 
     case StateEvent::WAITING_FOR_FIRST_BATCH:
-      DCHECK_EQ(current_state, TFInstanceExecState::WAITING_FOR_OPEN);
+      DCHECK_EQ(current_state, FInstanceExecStatePB::WAITING_FOR_OPEN);
       event_sequence_->MarkEvent("Open Finished");
-      next_state = TFInstanceExecState::WAITING_FOR_FIRST_BATCH;
+      next_state = FInstanceExecStatePB::WAITING_FOR_FIRST_BATCH;
       break;
 
     case StateEvent::BATCH_PRODUCED:
-      if (UNLIKELY(current_state == TFInstanceExecState::WAITING_FOR_FIRST_BATCH)) {
+      if (UNLIKELY(current_state == FInstanceExecStatePB::WAITING_FOR_FIRST_BATCH)) {
         event_sequence_->MarkEvent("First Batch Produced");
-        next_state = TFInstanceExecState::FIRST_BATCH_PRODUCED;
+        next_state = FInstanceExecStatePB::FIRST_BATCH_PRODUCED;
       } else {
-        DCHECK_EQ(current_state, TFInstanceExecState::PRODUCING_DATA);
+        DCHECK_EQ(current_state, FInstanceExecStatePB::PRODUCING_DATA);
       }
       break;
 
     case StateEvent::BATCH_SENT:
-      if (UNLIKELY(current_state == TFInstanceExecState::FIRST_BATCH_PRODUCED)) {
+      if (UNLIKELY(current_state == FInstanceExecStatePB::FIRST_BATCH_PRODUCED)) {
         event_sequence_->MarkEvent("First Batch Sent");
-        next_state = TFInstanceExecState::PRODUCING_DATA;
+        next_state = FInstanceExecStatePB::PRODUCING_DATA;
       } else {
-        DCHECK_EQ(current_state, TFInstanceExecState::PRODUCING_DATA);
+        DCHECK_EQ(current_state, FInstanceExecStatePB::PRODUCING_DATA);
       }
       break;
 
     case StateEvent::LAST_BATCH_SENT:
-      DCHECK_EQ(current_state, TFInstanceExecState::PRODUCING_DATA);
-      next_state = TFInstanceExecState::LAST_BATCH_SENT;
+      DCHECK_EQ(current_state, FInstanceExecStatePB::PRODUCING_DATA);
+      next_state = FInstanceExecStatePB::LAST_BATCH_SENT;
       break;
 
     case StateEvent::EXEC_END:
       // Allow abort in all states to make error handling easier.
       event_sequence_->MarkEvent("ExecInternal Finished");
-      next_state = TFInstanceExecState::FINISHED;
+      next_state = FInstanceExecStatePB::FINISHED;
       break;
 
     default:
@@ -518,7 +520,7 @@ void FragmentInstanceState::PublishFilter(const TPublishFilterParams& params) {
   runtime_state_->filter_bank()->PublishGlobalFilter(params);
 }
 
-string FragmentInstanceState::ExecStateToString(const TFInstanceExecState::type state) {
+string FragmentInstanceState::ExecStateToString(FInstanceExecStatePB state) {
   // Labels to send to the debug webpages to display the current state to the user.
   static const string finstance_state_labels[] = {
       "Waiting for Exec",         // WAITING_FOR_EXEC
@@ -530,12 +532,10 @@ string FragmentInstanceState::ExecStateToString(const TFInstanceExecState::type
       "Producing Data",           // PRODUCING_DATA
       "Last batch sent",          // LAST_BATCH_SENT
       "Finished"                  // FINISHED
-
   };
   /// Make sure we have a label for every possible state.
-  static_assert(
-      sizeof(finstance_state_labels) / sizeof(char*) == TFInstanceExecState::FINISHED + 1,
-      "");
+  static_assert(sizeof(finstance_state_labels) / sizeof(char*) ==
+      FInstanceExecStatePB::FINISHED + 1, "");
 
   DCHECK_LT(state, sizeof(finstance_state_labels) / sizeof(char*))
       << "Unknown instance state";

http://git-wip-us.apache.org/repos/asf/impala/blob/5391100c/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index d1f21f5..e4ebdce 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -28,7 +28,9 @@
 #include "common/thread-debug-info.h"
 #include "util/promise.h"
 
+#include "gen-cpp/control_service.pb.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
+#include "gutil/threading/thread_collision_warner.h" // for DFAKE_*
 #include "runtime/row-batch.h"
 #include "util/condition-variable.h"
 #include "util/promise.h"
@@ -60,10 +62,10 @@ class RuntimeState;
 /// contains a timeline of events of the fragment instance.
 /// The FIS periodically makes a ReportExecStatus RPC to the coordinator to report the
 /// execution status, the current state of the execution, and the instance profile. The
-/// frequency of those reports is controlled by the flag status_report_interval; setting
-/// that flag to 0 disables periodic reporting altogether Regardless of the value of that
-/// flag, a report is sent at least once at the end of execution with an overall status
-/// and profile (and 'done' indicator).
+/// frequency of those reports is controlled by the flag status_report_interval_ms;
+/// Setting that flag to 0 disables periodic reporting altogether. Regardless of the value
+/// of that flag, a report is sent at least once at the end of execution with an overall
+/// status and profile (and 'done' indicator).
 /// The FIS will send at least one final status report. If execution ended with an error,
 /// that error status will be part of the final report (it will not be overridden by
 /// the resulting cancellation).
@@ -101,8 +103,8 @@ class FragmentInstanceState {
   /// the Prepare phase. May be nullptr.
   PlanRootSink* root_sink() { return root_sink_; }
 
-  /// Returns a string description of 'current_state_'.
-  static string ExecStateToString(const TFInstanceExecState::type state);
+  /// Returns a string description of 'state'.
+  static string ExecStateToString(FInstanceExecStatePB state);
 
   /// Name of the counter that is tracking per query, per host peak mem usage.
   /// TODO: this doesn't look like it belongs here
@@ -116,10 +118,15 @@ class FragmentInstanceState {
   const TPlanFragmentInstanceCtx& instance_ctx() const { return instance_ctx_; }
   const TUniqueId& query_id() const { return query_ctx().query_id; }
   const TUniqueId& instance_id() const { return instance_ctx_.fragment_instance_id; }
-  TFInstanceExecState::type current_state() const { return current_state_.Load(); }
+  FInstanceExecStatePB current_state() const { return current_state_.Load(); }
   const TNetworkAddress& coord_address() const { return query_ctx().coord_address; }
   ObjectPool* obj_pool();
 
+  /// Returns the monotonically increasing sequence number. Called by status report thread
+  /// only except for the final report which is handled by finstance exec thread after the
+  /// reporting thread has exited.
+  int64_t AdvanceReportSeqNo() { return ++report_seq_no_; }
+
   /// Returns true if the current thread is a thread executing the whole or part of
   /// a fragment instance.
   static bool IsFragmentExecThread() {
@@ -163,6 +170,14 @@ class FragmentInstanceState {
   /// by report_thread_lock_.
   bool report_thread_active_ = false;
 
+  /// A 'fake mutex' to detect any race condition in accessing 'report_seq_no_' below.
+  /// There should be only one thread doing status report at the same time.
+  DFAKE_MUTEX(report_status_lock_);
+
+  /// Monotonically increasing sequence number used in status report to prevent
+  /// duplicated or out-of-order reports.
+  int64_t report_seq_no_ = 0;
+
   /// Profile for timings for each stage of the plan fragment instance's lifecycle.
   /// Lives in obj_pool().
   RuntimeProfile* timings_profile_ = nullptr;
@@ -196,8 +211,7 @@ class FragmentInstanceState {
 
   /// The current state of this fragment instance's execution. Only updated by the
   /// fragment instance thread in UpdateState() and read by the profile reporting threads.
-  AtomicEnum<TFInstanceExecState::type> current_state_{
-    TFInstanceExecState::WAITING_FOR_EXEC};
+  AtomicEnum<FInstanceExecStatePB> current_state_{FInstanceExecStatePB::WAITING_FOR_EXEC};
 
   /// Output sink for rows sent to this fragment. Created in Prepare(), lives in
   /// obj_pool().
@@ -272,8 +286,10 @@ class FragmentInstanceState {
 
   /// Invoked the report callback. If 'done' is true, sends the final report with
   /// 'status' and the profile. This type of report is sent once and only by the
-  /// instance execution thread.  Otherwise, a profile-only report is sent, which the
-  /// ReportProfileThread() thread will do periodically.
+  /// instance execution thread. Otherwise, a profile-only report is sent, which the
+  /// ReportProfileThread() thread will do periodically. It's expected that only one
+  /// of instance execution thread or ReportProfileThread() should be calling this
+  /// function at a time.
   void SendReport(bool done, const Status& status);
 
   /// Handle the execution event 'event'. This implements a state machine and will update