You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2018/07/24 18:01:45 UTC

[4/5] impala git commit: IMPALA-7212: Removes --use_krpc flag and remove old DataStream services

IMPALA-7212: Removes --use_krpc flag and remove old DataStream services

This change removes the flag --use_krpc which allows users
to fall back to using Thrift based implementation of DataStream
services. This flag was originally added during development of
IMPALA-2567. It has served its purpose.

As we port more ImpalaInternalServices to use KRPC, it's becoming
increasingly burdensome to maintain parallel implementation of the
RPC handlers. Therefore, going forward, KRPC is always enabled.
This change removes the Thrift based implemenation of DataStreamServices
and also simplifies some of the tests which were skipped when KRPC
is disabled.

Testing done: core debug build.

Change-Id: Icfed200751508478a3d728a917448f2dabfc67c3
Reviewed-on: http://gerrit.cloudera.org:8080/10835
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8d7f6386
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8d7f6386
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8d7f6386

Branch: refs/heads/master
Commit: 8d7f63865405c50981647ae3198bd4d39b465bf8
Parents: f9e7d93
Author: Michael Ho <kw...@cloudera.com>
Authored: Tue Jun 26 00:56:02 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 24 02:36:50 2018 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc               |   3 +-
 be/src/exec/data-sink.cc                    |  16 +-
 be/src/exec/exchange-node.cc                |   8 +-
 be/src/exec/exchange-node.h                 |   4 +-
 be/src/rpc/authentication.cc                |   1 -
 be/src/rpc/rpc-mgr-kerberized-test.cc       |   3 -
 be/src/rpc/thrift-server-test.cc            |   3 -
 be/src/runtime/CMakeLists.txt               |   3 -
 be/src/runtime/backend-client.h             |  15 -
 be/src/runtime/data-stream-mgr-base.h       |  60 ---
 be/src/runtime/data-stream-mgr.h            | 241 ----------
 be/src/runtime/data-stream-recvr-base.h     |  60 ---
 be/src/runtime/data-stream-recvr.cc         | 366 ---------------
 be/src/runtime/data-stream-recvr.h          | 202 --------
 be/src/runtime/data-stream-sender.cc        | 563 -----------------------
 be/src/runtime/data-stream-sender.h         | 158 -------
 be/src/runtime/data-stream-test.cc          | 224 ++-------
 be/src/runtime/exec-env.cc                  |  56 +--
 be/src/runtime/exec-env.h                   |  12 +-
 be/src/runtime/fragment-instance-state.cc   |   6 +-
 be/src/runtime/krpc-data-stream-mgr.cc      |   6 +-
 be/src/runtime/krpc-data-stream-mgr.h       |  11 +-
 be/src/runtime/krpc-data-stream-recvr.cc    |   1 -
 be/src/runtime/krpc-data-stream-recvr.h     |   4 +-
 be/src/runtime/runtime-state.cc             |   6 +-
 be/src/runtime/runtime-state.h              |   4 +-
 be/src/scheduling/scheduler.cc              |  22 +-
 be/src/service/data-stream-service.cc       |   4 +-
 be/src/service/impala-internal-service.cc   |   9 -
 be/src/service/impala-internal-service.h    |   2 -
 be/src/service/impala-server.cc             |  38 +-
 be/src/service/impala-server.h              |   4 -
 bin/run-all-tests.sh                        |  13 -
 bin/start-impala-cluster.py                 |   5 -
 common/thrift/ImpalaInternalService.thrift  |  32 --
 tests/common/custom_cluster_test_suite.py   |   3 -
 tests/common/skip.py                        |   4 -
 tests/common/test_skip.py                   |  39 --
 tests/conftest.py                           |   4 -
 tests/custom_cluster/test_krpc_mem_usage.py |   1 -
 tests/custom_cluster/test_krpc_metrics.py   |   1 -
 tests/custom_cluster/test_rpc_exception.py  |  12 -
 tests/query_test/test_codegen.py            |   1 -
 tests/webserver/test_web_pages.py           |   1 -
 44 files changed, 88 insertions(+), 2143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 6130510..c6df084 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -37,7 +37,7 @@ DEFINE_string(hostname, "", "Hostname to use for this daemon, also used as part
 
 DEFINE_int32(be_port, 22000,
     "port on which thrift based ImpalaInternalService is exported");
-DEFINE_int32_hidden(krpc_port, 27000,
+DEFINE_int32(krpc_port, 27000,
     "port on which KRPC based ImpalaInternalService is exported");
 
 // Kerberos is enabled if and only if principal is set.
@@ -258,3 +258,4 @@ REMOVED_FLAG(use_statestore);
 REMOVED_FLAG(use_kudu_kinit);
 REMOVED_FLAG(disable_admission_control);
 REMOVED_FLAG(disable_mem_pools);
+REMOVED_FLAG(use_krpc);

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 2ca0019..5e0fc3c 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -31,14 +31,12 @@
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/strings/substitute.h"
-#include "runtime/data-stream-sender.h"
 #include "runtime/krpc-data-stream-sender.h"
 #include "runtime/mem-tracker.h"
 #include "util/container-util.h"
 
 #include "common/names.h"
 
-DECLARE_bool(use_krpc);
 DEFINE_int64(data_stream_sender_buffer_size, 16 * 1024,
     "(Advanced) Max size in bytes which a row batch in a data stream sender's channel "
     "can accumulate before the row batch is sent over the wire.");
@@ -66,16 +64,10 @@ Status DataSink::Create(const TPlanFragmentCtx& fragment_ctx,
     case TDataSinkType::DATA_STREAM_SINK:
       if (!thrift_sink.__isset.stream_sink) return Status("Missing data stream sink.");
 
-      if (FLAGS_use_krpc) {
-        *sink = pool->Add(new KrpcDataStreamSender(fragment_instance_ctx.sender_id,
-            row_desc, thrift_sink.stream_sink, fragment_ctx.destinations,
-            FLAGS_data_stream_sender_buffer_size, state));
-      } else {
-        // TODO: figure out good buffer size based on size of output row
-        *sink = pool->Add(new DataStreamSender(fragment_instance_ctx.sender_id, row_desc,
-            thrift_sink.stream_sink, fragment_ctx.destinations,
-            FLAGS_data_stream_sender_buffer_size, state));
-      }
+      // TODO: figure out good buffer size based on size of output row
+      *sink = pool->Add(new KrpcDataStreamSender(fragment_instance_ctx.sender_id,
+          row_desc, thrift_sink.stream_sink, fragment_ctx.destinations,
+          FLAGS_data_stream_sender_buffer_size, state));
       break;
     case TDataSinkType::TABLE_SINK:
       if (!thrift_sink.__isset.table_sink) return Status("Missing table sink.");

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 8884f30..07511a7 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -20,11 +20,11 @@
 #include <boost/scoped_ptr.hpp>
 
 #include "exprs/scalar-expr.h"
-#include "runtime/data-stream-mgr.h"
-#include "runtime/data-stream-recvr.h"
-#include "runtime/runtime-state.h"
-#include "runtime/row-batch.h"
 #include "runtime/exec-env.h"
+#include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/krpc-data-stream-recvr.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
 #include "util/time.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/exec/exchange-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index 64fa9fe..bdc52c6 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -26,7 +26,7 @@
 
 namespace impala {
 
-class DataStreamRecvrBase;
+class KrpcDataStreamRecvr;
 class RowBatch;
 class ScalarExpr;
 class TupleRowComparator;
@@ -76,7 +76,7 @@ class ExchangeNode : public ExecNode {
   /// The underlying DataStreamRecvrBase instance. Ownership is shared between this
   /// exchange node instance and the DataStreamMgr used to create the receiver.
   /// stream_recvr_->Close() must be called before this instance is destroyed.
-  std::shared_ptr<DataStreamRecvrBase> stream_recvr_;
+  std::shared_ptr<KrpcDataStreamRecvr> stream_recvr_;
 
   /// our input rows are a prefix of the rows we produce
   RowDescriptor input_row_desc_;

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/rpc/authentication.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index 461f155..c22ab88 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -65,7 +65,6 @@ using namespace apache::thrift;
 using namespace boost::filesystem;   // for is_regular()
 using namespace strings;
 
-DECLARE_bool(use_krpc);
 DECLARE_string(keytab_file);
 DECLARE_string(principal);
 DECLARE_string(be_principal);

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/rpc/rpc-mgr-kerberized-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc b/be/src/rpc/rpc-mgr-kerberized-test.cc
index 0f1f3bb..c6b95c8 100644
--- a/be/src/rpc/rpc-mgr-kerberized-test.cc
+++ b/be/src/rpc/rpc-mgr-kerberized-test.cc
@@ -18,8 +18,6 @@
 #include "rpc/rpc-mgr-test-base.h"
 #include "service/fe-support.h"
 
-DECLARE_bool(use_krpc);
-
 DECLARE_string(be_principal);
 DECLARE_string(hostname);
 DECLARE_string(principal);
@@ -38,7 +36,6 @@ class RpcMgrKerberizedTest :
     public RpcMgrTestBase<testing::TestWithParam<KerberosSwitch> > {
 
   virtual void SetUp() override {
-    FLAGS_use_krpc = true;
     FLAGS_principal = "dummy-service/host@realm";
     FLAGS_be_principal = strings::Substitute("$0@$1", kdc_principal, kdc_realm);
     ASSERT_OK(InitAuth(CURRENT_EXECUTABLE_PATH));

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/rpc/thrift-server-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index 4f97237..af867de 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -35,8 +35,6 @@ using namespace strings;
 using namespace apache::thrift;
 using apache::thrift::transport::SSLProtocol;
 
-DECLARE_bool(use_krpc);
-
 DECLARE_string(principal);
 DECLARE_string(be_principal);
 DECLARE_string(ssl_client_ca_certificate);
@@ -104,7 +102,6 @@ class ThriftKerberizedParamsTest :
 
   virtual void SetUp() override {
     KerberosSwitch k = GetParam();
-    FLAGS_use_krpc = false;
     if (k == KERBEROS_OFF) {
       FLAGS_principal.clear();
       FLAGS_be_principal.clear();

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index e09b27c..f67b8fe 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -32,9 +32,6 @@ add_library(Runtime
   client-cache.cc
   coordinator.cc
   coordinator-backend-state.cc
-  data-stream-mgr.cc
-  data-stream-recvr.cc
-  data-stream-sender.cc
   debug-options.cc
   descriptors.cc
   dml-exec-state.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/backend-client.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index 136a71e..0e35999 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -75,21 +75,6 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient {
     ImpalaInternalServiceClient::recv_CancelQueryFInstances(_return);
   }
 
-  void TransmitData(TTransmitDataResult& _return, const TTransmitDataParams& params,
-      bool* send_done) {
-    DCHECK(!*send_done);
-    FAULT_INJECTION_SEND_RPC_EXCEPTION(1024);
-    if (transmit_csw_ != NULL) {
-      SCOPED_CONCURRENT_COUNTER(transmit_csw_);
-      ImpalaInternalServiceClient::send_TransmitData(params);
-    } else {
-      ImpalaInternalServiceClient::send_TransmitData(params);
-    }
-    *send_done = true;
-    FAULT_INJECTION_RECV_RPC_EXCEPTION(1024);
-    ImpalaInternalServiceClient::recv_TransmitData(_return);
-  }
-
   /// Callers of TransmitData() should provide their own counter to measure the data
   /// transmission time.
   void SetTransmitDataCounter(RuntimeProfile::ConcurrentTimerCounter* csw) {

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-mgr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr-base.h b/be/src/runtime/data-stream-mgr-base.h
deleted file mode 100644
index 6f1ec78..0000000
--- a/be/src/runtime/data-stream-mgr-base.h
+++ /dev/null
@@ -1,60 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_RUNTIME_DATA_STREAM_MGR_BASE_H
-#define IMPALA_RUNTIME_DATA_STREAM_MGR_BASE_H
-
-#include "common/status.h"
-#include "runtime/bufferpool/buffer-pool.h"
-#include "runtime/descriptors.h"  // for PlanNodeId
-#include "util/aligned-new.h"
-
-namespace impala {
-
-class DataStreamRecvrBase;
-class MemTracker;
-class RuntimeProfile;
-class RuntimeState;
-class TRowBatch;
-class TUniqueId;
-
-/// Interface for a singleton class which manages all incoming data streams at a backend
-/// node.
-/// TODO: This is a temporary pure virtual base class that defines the basic interface for
-/// 2 parallel implementations of the DataStreamMgrBase, one each for Thrift and KRPC.
-/// Remove this in favor of the KRPC implementation when possible.
-class DataStreamMgrBase : public CacheLineAligned {
- public:
-  DataStreamMgrBase() {}
-
-  virtual ~DataStreamMgrBase() { }
-
-  /// Create a receiver for a specific fragment_instance_id/node_id destination;
-  virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
-      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
-      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
-      MemTracker* parent_tracker, BufferPool::ClientHandle* client = nullptr) = 0;
-
-  /// Closes all receivers registered for fragment_instance_id immediately.
-  virtual void Cancel(const TUniqueId& fragment_instance_id) = 0;
-
-};
-
-}
-
-#endif /* IMPALA_RUNTIME_DATA_STREAM_MGR_BASE_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
deleted file mode 100644
index f37b1b1..0000000
--- a/be/src/runtime/data-stream-mgr.h
+++ /dev/null
@@ -1,241 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_RUNTIME_DATA_STREAM_MGR_H
-#define IMPALA_RUNTIME_DATA_STREAM_MGR_H
-
-#include <list>
-#include <set>
-#include <boost/thread/mutex.hpp>
-#include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
-
-#include "common/status.h"
-#include "common/object-pool.h"
-#include "runtime/data-stream-mgr-base.h"
-#include "runtime/descriptors.h"  // for PlanNodeId
-#include "util/metrics.h"
-#include "util/promise.h"
-#include "util/runtime-profile.h"
-#include "gen-cpp/Types_types.h"  // for TUniqueId
-
-namespace impala {
-
-class DescriptorTbl;
-class DataStreamRecvr;
-class RowBatch;
-class RuntimeState;
-class TRowBatch;
-
-/// Singleton class which manages all incoming data streams at a backend node. It
-/// provides both producer and consumer functionality for each data stream.
-/// - ImpalaBackend service threads use this to add incoming data to streams
-///   in response to TransmitData rpcs (AddData()) or to signal end-of-stream conditions
-///   (CloseSender()).
-/// - Exchange nodes extract data from an incoming stream via a DataStreamRecvr,
-///   which is created with CreateRecvr().
-//
-/// DataStreamMgr also allows asynchronous cancellation of streams via Cancel()
-/// which unblocks all DataStreamRecvr::GetBatch() calls that are made on behalf
-/// of the cancelled fragment id.
-///
-/// Exposes three metrics:
-///  'senders-blocked-on-recvr-creation' - currently blocked senders.
-///  'total-senders-blocked-on-recvr-creation' - total number of blocked senders over
-///  time.
-///  'total-senders-timedout-waiting-for-recvr-creation' - total number of senders that
-///  timed-out while waiting for a receiver.
-///
-/// TODO: The recv buffers used in DataStreamRecvr should count against
-/// per-query memory limits.
-class DataStreamMgr : public DataStreamMgrBase {
- public:
-  DataStreamMgr(MetricGroup* metrics);
-  virtual ~DataStreamMgr() override;
-
-  /// Create a receiver for a specific fragment_instance_id/node_id destination;
-  /// If is_merging is true, the receiver maintains a separate queue of incoming row
-  /// batches for each sender and merges the sorted streams from each sender into a
-  /// single stream. 'parent_tracker' is the MemTracker of the exchange node which owns
-  /// this receiver. It's the parent of the MemTracker of the newly created receiver.
-  /// Ownership of the receiver is shared between this DataStream mgr instance and the
-  /// caller. 'client' is the BufferPool's client handle for allocating buffers.
-  /// It's owned by the parent exchange node.
-  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
-      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
-      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
-      MemTracker* parent_tracker, BufferPool::ClientHandle* client) override;
-
-  /// Adds a row batch to the recvr identified by fragment_instance_id/dest_node_id
-  /// if the recvr has not been cancelled. sender_id identifies the sender instance
-  /// from which the data came.
-  /// The call blocks if this ends up pushing the stream over its buffering limit;
-  /// it unblocks when the consumer removed enough data to make space for
-  /// row_batch.
-  /// TODO: enforce per-sender quotas (something like 200% of buffer_size/#senders),
-  /// so that a single sender can't flood the buffer and stall everybody else.
-  /// Returns OK if successful, error status otherwise.
-  Status AddData(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
-                 const TRowBatch& thrift_batch, int sender_id);
-
-  /// Notifies the recvr associated with the fragment/node id that the specified
-  /// sender has closed.
-  /// Returns OK if successful, error status otherwise.
-  Status CloseSender(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
-      int sender_id);
-
-  /// Closes all receivers registered for fragment_instance_id immediately.
-  void Cancel(const TUniqueId& fragment_instance_id) override;
-
- private:
-  friend class DataStreamRecvr;
-
-  /// Owned by the metric group passed into the constructor
-  MetricGroup* metrics_;
-
-  /// Current number of senders waiting for a receiver to register
-  IntGauge* num_senders_waiting_;
-
-  /// Total number of senders that have ever waited for a receiver to register
-  IntCounter* total_senders_waited_;
-
-  /// Total number of senders that timed-out waiting for a receiver to register
-  IntCounter* num_senders_timedout_;
-
-  /// protects all fields below
-  boost::mutex lock_;
-
-  /// map from hash value of fragment instance id/node id pair to stream receivers;
-  /// Ownership of the stream revcr is shared between this instance and the caller of
-  /// CreateRecvr().
-  /// we don't want to create a map<pair<TUniqueId, PlanNodeId>, DataStreamRecvr*>,
-  /// because that requires a bunch of copying of ids for lookup
-  typedef boost::unordered_multimap<uint32_t,
-      std::shared_ptr<DataStreamRecvr>> RecvrMap;
-  RecvrMap receiver_map_;
-
-  /// (Fragment instance id, Plan node id) pair that uniquely identifies a stream.
-  typedef std::pair<impala::TUniqueId, PlanNodeId> RecvrId;
-
-  /// Less-than ordering for RecvrIds.
-  struct ComparisonOp {
-    bool operator()(const RecvrId& a, const RecvrId& b) {
-      if (a.first.hi < b.first.hi) {
-        return true;
-      } else if (a.first.hi > b.first.hi) {
-        return false;
-      } else if (a.first.lo < b.first.lo) {
-        return true;
-      } else if (a.first.lo > b.first.lo) {
-        return false;
-      }
-      return a.second < b.second;
-    }
-  };
-
-  /// Ordered set of receiver IDs so that we can easily find all receivers for a given
-  /// fragment (by starting at (fragment instance id, 0) and iterating until the fragment
-  /// instance id changes), which is required for cancellation of an entire fragment.
-  ///
-  /// There is one entry in fragment_recvr_set_ for every entry in receiver_map_.
-  typedef std::set<RecvrId, ComparisonOp> FragmentRecvrSet;
-  FragmentRecvrSet fragment_recvr_set_;
-
-  /// Return the receiver for given fragment_instance_id/node_id, or NULL if not found. If
-  /// 'acquire_lock' is false, assumes lock_ is already being held and won't try to
-  /// acquire it.
-  std::shared_ptr<DataStreamRecvr> FindRecvr(const TUniqueId& fragment_instance_id,
-      PlanNodeId node_id, bool acquire_lock = true);
-
-  /// Calls FindRecvr(), but if NULL is returned, wait for up to
-  /// FLAGS_datastream_sender_timeout_ms for the receiver to be registered.  Senders may
-  /// initialise and start sending row batches before a receiver is ready. To accommodate
-  /// this, we allow senders to establish a rendezvous between them and the receiver. When
-  /// the receiver arrives, it triggers the rendezvous, and all waiting senders can
-  /// proceed. A sender that waits for too long (120s by default) will eventually time out
-  /// and abort. The output parameter 'already_unregistered' distinguishes between the two
-  /// cases in which this method returns NULL:
-  ///
-  /// 1. *already_unregistered == true: the receiver had previously arrived and was
-  /// already closed
-  ///
-  /// 2. *already_unregistered == false: the receiver has yet to arrive when this method
-  /// returns, and the timeout has expired
-  std::shared_ptr<DataStreamRecvr> FindRecvrOrWait(
-      const TUniqueId& fragment_instance_id, PlanNodeId node_id,
-      bool* already_unregistered);
-
-  /// Remove receiver block for fragment_instance_id/node_id from the map.
-  Status DeregisterRecvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id);
-
-  inline uint32_t GetHashValue(const TUniqueId& fragment_instance_id, PlanNodeId node_id);
-
-  /// The coordination primitive used to signal the arrival of a waited-for receiver
-  typedef Promise<std::shared_ptr<DataStreamRecvr>> RendezvousPromise;
-
-  /// A reference-counted promise-wrapper used to coordinate between senders and
-  /// receivers. The ref_count field tracks the number of senders waiting for the arrival
-  /// of a particular receiver. When ref_count returns to 0, the last sender has ceased
-  /// waiting (either because of a timeout, or because the receiver arrived), and the
-  /// rendezvous can be torn down.
-  ///
-  /// Access is only thread-safe when lock_ is held.
-  struct RefCountedPromise {
-    uint32_t ref_count;
-
-    // Without a conveniently copyable smart ptr, we keep a raw pointer to the promise and
-    // are careful to delete it when ref_count becomes 0.
-    RendezvousPromise* promise;
-
-    void IncRefCount() { ++ref_count; }
-
-    uint32_t DecRefCount() {
-      if (--ref_count == 0) delete promise;
-      return ref_count;
-    }
-
-    RefCountedPromise() : ref_count(0), promise(new RendezvousPromise()) { }
-  };
-
-  /// Map from stream (which identifies a receiver) to a (count, promise) pair that gives
-  /// the number of senders waiting as well as a shared promise whose value is Set() with
-  /// a pointer to the receiver when the receiver arrives. The count is used to detect
-  /// when no receivers are waiting, to initiate clean-up after the fact.
-  ///
-  /// If pending_rendezvous_[X] exists, then receiver_map_[hash(X)] and
-  /// fragment_recvr_set_[X] may exist (and vice versa), as entries are removed from
-  /// pending_rendezvous_ some time after the rendezvous is triggered by the arrival of a
-  /// matching receiver.
-  typedef boost::unordered_map<RecvrId, RefCountedPromise> RendezvousMap;
-  RendezvousMap pending_rendezvous_;
-
-  /// Map from the time, in ms, that a stream should be evicted from closed_stream_cache
-  /// to its RecvrId. Used to evict old streams from cache efficiently. multimap in case
-  /// there are multiple streams with the same eviction time.
-  typedef std::multimap<int64_t, RecvrId> ClosedStreamMap;
-  ClosedStreamMap closed_stream_expirations_;
-
-  /// Cache of recently closed RecvrIds. Used to allow straggling senders to fail fast by
-  /// checking this cache, rather than waiting for the missed-receiver timeout to elapse
-  /// in FindRecvrOrWait().
-  boost::unordered_set<RecvrId> closed_stream_cache_;
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-recvr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr-base.h b/be/src/runtime/data-stream-recvr-base.h
deleted file mode 100644
index e0d06fe..0000000
--- a/be/src/runtime/data-stream-recvr-base.h
+++ /dev/null
@@ -1,60 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_DATA_STREAM_RECVR_BASE_H
-#define IMPALA_RUNTIME_DATA_STREAM_RECVR_BASE_H
-
-#include "common/status.h"
-
-namespace impala {
-
-class RowBatch;
-class TupleRowComparator;
-
-/// Interface for a single receiver of a m:n data stream.
-/// DataStreamRecvrBase implementations should maintain one or more queues of row batches
-/// received by a DataStreamMgrBase implementation from one or more sender fragment
-/// instances.
-/// TODO: This is a temporary pure virtual base class that defines the basic interface for
-/// 2 parallel implementations of the DataStreamRecvrBase, one each for Thrift and KRPC.
-/// Remove this in favor of the KRPC implementation when possible.
-class DataStreamRecvrBase {
- public:
-  DataStreamRecvrBase() { }
-  virtual ~DataStreamRecvrBase() { }
-
-  /// Returns next row batch in data stream.
-  virtual Status GetBatch(RowBatch** next_batch) = 0;
-
-  virtual void Close() = 0;
-
-  /// Create a SortedRunMerger instance to merge rows from multiple senders according to
-  /// the specified row comparator.
-  virtual Status CreateMerger(const TupleRowComparator& less_than) = 0;
-
-  /// Fill output_batch with the next batch of rows.
-  virtual Status GetNext(RowBatch* output_batch, bool* eos) = 0;
-
-  /// Transfer all resources from the current batches being processed from each sender
-  /// queue to the specified batch.
-  virtual void TransferAllResources(RowBatch* transfer_batch) = 0;
-
-};
-
-}
-
-#endif /* IMPALA_RUNTIME_DATA_STREAM_RECVR_BASE_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
deleted file mode 100644
index c9a9ab9..0000000
--- a/be/src/runtime/data-stream-recvr.cc
+++ /dev/null
@@ -1,366 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <boost/thread/locks.hpp>
-#include <boost/thread/mutex.hpp>
-
-#include "runtime/data-stream-recvr.h"
-#include "runtime/data-stream-mgr.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/row-batch.h"
-#include "runtime/row-batch-queue.h"
-#include "runtime/sorted-run-merger.h"
-#include "util/condition-variable.h"
-#include "util/runtime-profile-counters.h"
-#include "util/periodic-counter-updater.h"
-
-#include "common/names.h"
-
-namespace impala {
-
-// Implements a blocking queue of row batches from one or more senders. One queue
-// is maintained per sender if is_merging_ is true for the enclosing receiver, otherwise
-// rows from all senders are placed in the same queue.
-class DataStreamRecvr::SenderQueue {
- public:
-  SenderQueue(DataStreamRecvr* parent_recvr, int num_senders);
-
-  // Return the next batch from this sender queue. Sets the returned batch in cur_batch_.
-  // A returned batch that is not filled to capacity does *not* indicate
-  // end-of-stream.
-  // The call blocks until another batch arrives or all senders close.
-  // their channels. The returned batch is owned by the sender queue. The caller
-  // must acquire data from the returned batch before the next call to GetBatch().
-  Status GetBatch(RowBatch** next_batch);
-
-  // Adds a row batch to this sender queue if this stream has not been cancelled;
-  // blocks if this will make the stream exceed its buffer limit.
-  // If the total size of the batches in this queue would exceed the allowed buffer size,
-  // the queue is considered full and the call blocks until a batch is dequeued.
-  void AddBatch(const TRowBatch& batch);
-
-  // Decrement the number of remaining senders for this queue and signal eos ("new data")
-  // if the count drops to 0. The number of senders will be 1 for a merging
-  // DataStreamRecvr.
-  void DecrementSenders();
-
-  // Set cancellation flag and signal cancellation to receiver and sender. Subsequent
-  // incoming batches will be dropped.
-  void Cancel();
-
-  // Must be called once to cleanup any queued resources.
-  void Close();
-
-  // Returns the current batch from this queue being processed by a consumer.
-  RowBatch* current_batch() const { return current_batch_.get(); }
-
- private:
-  // Receiver of which this queue is a member.
-  DataStreamRecvr* recvr_;
-
-  // protects all subsequent data.
-  mutex lock_;
-
-  // if true, the receiver fragment for this stream got cancelled
-  bool is_cancelled_;
-
-  // number of senders which haven't closed the channel yet
-  // (if it drops to 0, end-of-stream is true)
-  int num_remaining_senders_;
-
-  // signal arrival of new batch or the eos/cancelled condition
-  ConditionVariable data_arrival_cv_;
-
-  // signal removal of data by stream consumer
-  ConditionVariable data_removal__cv_;
-
-  // queue of (batch length, batch) pairs.  The SenderQueue block owns memory to
-  // these batches. They are handed off to the caller via GetBatch.
-  typedef list<pair<int, RowBatch*>> RowBatchQueue;
-  RowBatchQueue batch_queue_;
-
-  // The batch that was most recently returned via GetBatch(), i.e. the current batch
-  // from this queue being processed by a consumer. Is destroyed when the next batch
-  // is retrieved.
-  scoped_ptr<RowBatch> current_batch_;
-
-  // Set to true when the first batch has been received
-  bool received_first_batch_;
-};
-
-DataStreamRecvr::SenderQueue::SenderQueue(DataStreamRecvr* parent_recvr, int num_senders)
-  : recvr_(parent_recvr),
-    is_cancelled_(false),
-    num_remaining_senders_(num_senders),
-    received_first_batch_(false) {
-}
-
-Status DataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
-  unique_lock<mutex> l(lock_);
-  // wait until something shows up or we know we're done
-  while (!is_cancelled_ && batch_queue_.empty() && num_remaining_senders_ > 0) {
-    VLOG_ROW << "wait arrival fragment_instance_id="
-             << PrintId(recvr_->fragment_instance_id())
-             << " node=" << recvr_->dest_node_id();
-    // Don't count time spent waiting on the sender as active time.
-    CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_);
-    CANCEL_SAFE_SCOPED_TIMER(
-        received_first_batch_ ? NULL : recvr_->first_batch_wait_total_timer_,
-        &is_cancelled_);
-    data_arrival_cv_.Wait(l);
-  }
-
-  // cur_batch_ must be replaced with the returned batch.
-  current_batch_.reset();
-  *next_batch = NULL;
-  if (is_cancelled_) return Status::CANCELLED;
-
-  if (batch_queue_.empty()) {
-    DCHECK_EQ(num_remaining_senders_, 0);
-    return Status::OK();
-  }
-
-  received_first_batch_ = true;
-
-  DCHECK(!batch_queue_.empty());
-  RowBatch* result = batch_queue_.front().second;
-  recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
-  VLOG_ROW << "fetched #rows=" << result->num_rows();
-  batch_queue_.pop_front();
-  data_removal__cv_.NotifyOne();
-  current_batch_.reset(result);
-  *next_batch = current_batch_.get();
-  return Status::OK();
-}
-
-void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) {
-  unique_lock<mutex> l(lock_);
-  if (is_cancelled_) return;
-
-  COUNTER_ADD(recvr_->bytes_received_counter_, RowBatch::GetSerializedSize(thrift_batch));
-  DCHECK_GT(num_remaining_senders_, 0);
-
-  // if there's something in the queue and this batch will push us over the
-  // buffer limit we need to wait until the batch gets drained.
-  // Note: It's important that we enqueue thrift_batch regardless of buffer limit if
-  // the queue is currently empty. In the case of a merging receiver, batches are
-  // received from a specific queue based on data order, and the pipeline will stall
-  // if the merger is waiting for data from an empty queue that cannot be filled because
-  // the limit has been reached.
-  int64_t batch_size = RowBatch::GetDeserializedSize(thrift_batch);
-  while (!batch_queue_.empty() && recvr_->ExceedsLimit(batch_size) && !is_cancelled_) {
-    CANCEL_SAFE_SCOPED_TIMER(recvr_->buffer_full_total_timer_, &is_cancelled_);
-    VLOG_ROW << " wait removal: empty=" << (batch_queue_.empty() ? 1 : 0)
-             << " #buffered=" << recvr_->num_buffered_bytes_.Load()
-             << " batch_size=" << batch_size << "\n";
-
-    // We only want one thread running the timer at any one time. Only
-    // one thread may lock the try_lock, and that 'winner' starts the
-    // scoped timer.
-    bool got_timer_lock = false;
-    {
-      try_mutex::scoped_try_lock timer_lock(recvr_->buffer_wall_timer_lock_);
-      if (timer_lock) {
-        CANCEL_SAFE_SCOPED_TIMER(recvr_->buffer_full_wall_timer_, &is_cancelled_);
-        data_removal__cv_.Wait(l);
-        got_timer_lock = true;
-      } else {
-        data_removal__cv_.Wait(l);
-        got_timer_lock = false;
-      }
-    }
-    // If we had the timer lock, wake up another writer to make sure
-    // that they (if no-one else) starts the timer. The guarantee is
-    // that if no thread has the try_lock, the thread that we wake up
-    // here will obtain it and run the timer.
-    //
-    // We must have given up the try_lock by this point, otherwise the
-    // woken thread might not successfully take the lock once it has
-    // woken up. (In fact, no other thread will run in AddBatch until
-    // this thread exits because of mutual exclusion around lock_, but
-    // it's good not to rely on that fact).
-    //
-    // The timer may therefore be an underestimate by the amount of
-    // time it takes this thread to finish (and yield lock_) and the
-    // notified thread to be woken up and to acquire the try_lock. In
-    // practice, this time is small relative to the total wait time.
-    if (got_timer_lock) data_removal__cv_.NotifyOne();
-  }
-
-  if (!is_cancelled_) {
-    RowBatch* batch = NULL;
-    {
-      SCOPED_TIMER(recvr_->deserialize_row_batch_timer_);
-      // Note: if this function makes a row batch, the batch *must* be added
-      // to batch_queue_. It is not valid to create the row batch and destroy
-      // it in this thread.
-      batch = new RowBatch(recvr_->row_desc_, thrift_batch, recvr_->mem_tracker());
-    }
-    VLOG_ROW << "added #rows=" << batch->num_rows()
-             << " batch_size=" << batch_size << "\n";
-    batch_queue_.push_back(make_pair(batch_size, batch));
-    recvr_->num_buffered_bytes_.Add(batch_size);
-    data_arrival_cv_.NotifyOne();
-  }
-}
-
-void DataStreamRecvr::SenderQueue::DecrementSenders() {
-  lock_guard<mutex> l(lock_);
-  DCHECK_GT(num_remaining_senders_, 0);
-  num_remaining_senders_ = max(0, num_remaining_senders_ - 1);
-  VLOG_FILE << "decremented senders: fragment_instance_id="
-            << PrintId(recvr_->fragment_instance_id())
-            << " node_id=" << recvr_->dest_node_id()
-            << " #senders=" << num_remaining_senders_;
-  if (num_remaining_senders_ == 0) data_arrival_cv_.NotifyOne();
-}
-
-void DataStreamRecvr::SenderQueue::Cancel() {
-  {
-    lock_guard<mutex> l(lock_);
-    if (is_cancelled_) return;
-    is_cancelled_ = true;
-    VLOG_QUERY << "cancelled stream: fragment_instance_id_="
-               << PrintId(recvr_->fragment_instance_id())
-               << " node_id=" << recvr_->dest_node_id();
-  }
-  // Wake up all threads waiting to produce/consume batches.  They will all
-  // notice that the stream is cancelled and handle it.
-  data_arrival_cv_.NotifyAll();
-  data_removal__cv_.NotifyAll();
-}
-
-void DataStreamRecvr::SenderQueue::Close() {
-  lock_guard<mutex> l(lock_);
-  // Note that the queue must be cancelled first before it can be closed or we may
-  // risk running into a race which can leak row batches. Please see IMPALA-3034.
-  DCHECK(is_cancelled_);
-  // Delete any batches queued in batch_queue_
-  for (RowBatchQueue::iterator it = batch_queue_.begin();
-      it != batch_queue_.end(); ++it) {
-    delete it->second;
-  }
-  current_batch_.reset();
-}
-
-Status DataStreamRecvr::CreateMerger(const TupleRowComparator& less_than) {
-  DCHECK(is_merging_);
-  vector<SortedRunMerger::RunBatchSupplierFn> input_batch_suppliers;
-  input_batch_suppliers.reserve(sender_queues_.size());
-
-  // Create the merger that will a single stream of sorted rows.
-  merger_.reset(new SortedRunMerger(less_than, row_desc_, profile_, false));
-
-  for (int i = 0; i < sender_queues_.size(); ++i) {
-    input_batch_suppliers.push_back(
-        bind(mem_fn(&SenderQueue::GetBatch), sender_queues_[i], _1));
-  }
-  RETURN_IF_ERROR(merger_->Prepare(input_batch_suppliers));
-  return Status::OK();
-}
-
-void DataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) {
-  for (SenderQueue* sender_queue: sender_queues_) {
-    if (sender_queue->current_batch() != NULL) {
-      sender_queue->current_batch()->TransferResourceOwnership(transfer_batch);
-    }
-  }
-}
-
-DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
-    const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-    PlanNodeId dest_node_id, int num_senders, bool is_merging, int64_t total_buffer_limit,
-    RuntimeProfile* parent_profile)
-  : mgr_(stream_mgr),
-    fragment_instance_id_(fragment_instance_id),
-    dest_node_id_(dest_node_id),
-    total_buffer_limit_(total_buffer_limit),
-    row_desc_(row_desc),
-    is_merging_(is_merging),
-    num_buffered_bytes_(0),
-    profile_(parent_profile->CreateChild("DataStreamReceiver")) {
-  // Create one queue per sender if is_merging is true.
-  int num_queues = is_merging ? num_senders : 1;
-  sender_queues_.reserve(num_queues);
-  int num_sender_per_queue = is_merging ? 1 : num_senders;
-  for (int i = 0; i < num_queues; ++i) {
-    SenderQueue* queue = sender_queue_pool_.Add(new SenderQueue(this,
-        num_sender_per_queue));
-    sender_queues_.push_back(queue);
-  }
-
-  mem_tracker_.reset(new MemTracker(profile_, -1, "DataStreamRecvr", parent_tracker));
-
-  // Initialize the counters
-  bytes_received_counter_ = ADD_COUNTER(profile_, "BytesReceived", TUnit::BYTES);
-  bytes_received_time_series_counter_ =
-      ADD_TIME_SERIES_COUNTER(profile_, "BytesReceived", bytes_received_counter_);
-  deserialize_row_batch_timer_ = ADD_TIMER(profile_, "DeserializeRowBatchTimer");
-  buffer_full_wall_timer_ = ADD_TIMER(profile_, "SendersBlockedTimer");
-  buffer_full_total_timer_ = ADD_TIMER(profile_, "SendersBlockedTotalTimer(*)");
-  data_arrival_timer_ = profile_->inactive_timer();
-  first_batch_wait_total_timer_ = ADD_TIMER(profile_, "FirstBatchArrivalWaitTime");
-}
-
-Status DataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
-  DCHECK(merger_.get() != NULL);
-  return merger_->GetNext(output_batch, eos);
-}
-
-void DataStreamRecvr::AddBatch(const TRowBatch& thrift_batch, int sender_id) {
-  int use_sender_id = is_merging_ ? sender_id : 0;
-  // Add all batches to the same queue if is_merging_ is false.
-  sender_queues_[use_sender_id]->AddBatch(thrift_batch);
-}
-
-void DataStreamRecvr::RemoveSender(int sender_id) {
-  int use_sender_id = is_merging_ ? sender_id : 0;
-  sender_queues_[use_sender_id]->DecrementSenders();
-}
-
-void DataStreamRecvr::CancelStream() {
-  for (int i = 0; i < sender_queues_.size(); ++i) {
-    sender_queues_[i]->Cancel();
-  }
-}
-
-void DataStreamRecvr::Close() {
-  // Remove this receiver from the DataStreamMgr that created it.
-  const Status status = mgr_->DeregisterRecvr(fragment_instance_id(), dest_node_id());
-  if (!status.ok()) {
-    LOG(WARNING) << "Error deregistering receiver: " << status.GetDetail();
-  }
-  mgr_ = NULL;
-  for (int i = 0; i < sender_queues_.size(); ++i) {
-    sender_queues_[i]->Close();
-  }
-  merger_.reset();
-  mem_tracker_->Close();
-  profile_->StopPeriodicCounters();
-}
-
-DataStreamRecvr::~DataStreamRecvr() {
-  DCHECK(mgr_ == NULL) << "Must call Close()";
-}
-
-Status DataStreamRecvr::GetBatch(RowBatch** next_batch) {
-  DCHECK(!is_merging_);
-  DCHECK_EQ(sender_queues_.size(), 1);
-  return sender_queues_[0]->GetBatch(next_batch);
-}
-
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.h b/be/src/runtime/data-stream-recvr.h
deleted file mode 100644
index 37e8f70..0000000
--- a/be/src/runtime/data-stream-recvr.h
+++ /dev/null
@@ -1,202 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_DATA_STREAM_RECVR_H
-#define IMPALA_RUNTIME_DATA_STREAM_RECVR_H
-
-#include "data-stream-recvr-base.h"
-
-#include <boost/scoped_ptr.hpp>
-#include <boost/thread/mutex.hpp>
-
-#include "common/atomic.h"
-#include "common/object-pool.h"
-#include "common/status.h"
-#include "gen-cpp/Types_types.h"   // for TUniqueId
-#include "runtime/descriptors.h"
-#include "util/runtime-profile.h"
-
-namespace impala {
-
-class DataStreamMgr;
-class SortedRunMerger;
-class MemTracker;
-class RowBatch;
-class TRowBatch;
-class TupleRowCompare;
-
-/// Single receiver of an m:n data stream.
-/// This is for use by the DataStreamMgr, which is the implementation of the abstract
-/// class DataStreamMgrBase that depends on Thrift.
-/// DataStreamRecvr maintains one or more queues of row batches received by a
-/// DataStreamMgr from one or more sender fragment instances.
-/// Receivers are created via DataStreamMgr::CreateRecvr().
-/// Ownership of a stream recvr is shared between the DataStreamMgr that created it and
-/// the caller of DataStreamMgr::CreateRecvr() (i.e. the exchange node)
-//
-/// The is_merging_ member determines if the recvr merges input streams from different
-/// sender fragment instances according to a specified sort order.
-/// If is_merging_ = false : Only one batch queue is maintained for row batches from all
-/// sender fragment instances. These row batches are returned one at a time via
-/// GetBatch().
-/// If is_merging_ is true : One queue is created for the batches from each distinct
-/// sender. A SortedRunMerger instance must be created via CreateMerger() prior to
-/// retrieving any rows from the receiver. Rows are retrieved from the receiver via
-/// GetNext(RowBatch* output_batch, int limit, bool eos). After the final call to
-/// GetNext(), TransferAllResources() must be called to transfer resources from the input
-/// batches from each sender to the caller's output batch.
-/// The receiver sets deep_copy to false on the merger - resources are transferred from
-/// the input batches from each sender queue to the merger to the output batch by the
-/// merger itself as it processes each run.
-//
-/// DataStreamRecvr::Close() must be called by the caller of CreateRecvr() to remove the
-/// recvr instance from the tracking structure of its DataStreamMgr in all cases.
-class DataStreamRecvr : public DataStreamRecvrBase {
- public:
-  virtual ~DataStreamRecvr() override;
-
-  /// Returns next row batch in data stream; blocks if there aren't any.
-  /// Retains ownership of the returned batch. The caller must acquire data from the
-  /// returned batch before the next call to GetBatch(). A NULL returned batch indicated
-  /// eos. Must only be called if is_merging_ is false.
-  /// TODO: This is currently only exposed to the non-merging version of the exchange.
-  /// Refactor so both merging and non-merging exchange use GetNext(RowBatch*, bool* eos).
-  Status GetBatch(RowBatch** next_batch) override;
-
-  /// Deregister from DataStreamMgr instance, which shares ownership of this instance.
-  void Close() override;
-
-  /// Create a SortedRunMerger instance to merge rows from multiple sender according to the
-  /// specified row comparator. Fetches the first batches from the individual sender
-  /// queues. The exprs used in less_than must have already been prepared and opened.
-  Status CreateMerger(const TupleRowComparator& less_than) override;
-
-  /// Fill output_batch with the next batch of rows obtained by merging the per-sender
-  /// input streams. Must only be called if is_merging_ is true.
-  Status GetNext(RowBatch* output_batch, bool* eos) override;
-
-  /// Transfer all resources from the current batches being processed from each sender
-  /// queue to the specified batch.
-  void TransferAllResources(RowBatch* transfer_batch) override;
-
-  const TUniqueId& fragment_instance_id() const { return fragment_instance_id_; }
-  PlanNodeId dest_node_id() const { return dest_node_id_; }
-  const RowDescriptor& row_desc() const { return *row_desc_; }
-  MemTracker* mem_tracker() const { return mem_tracker_.get(); }
-
- private:
-  friend class DataStreamMgr;
-  class SenderQueue;
-
-  DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, bool is_merging,
-      int64_t total_buffer_limit, RuntimeProfile* parent_profile);
-
-  /// Add a new batch of rows to the appropriate sender queue, blocking if the queue is
-  /// full. Called from DataStreamMgr.
-  void AddBatch(const TRowBatch& thrift_batch, int sender_id);
-
-  /// Indicate that a particular sender is done. Delegated to the appropriate
-  /// sender queue. Called from DataStreamMgr.
-  void RemoveSender(int sender_id);
-
-  /// Empties the sender queues and notifies all waiting consumers of cancellation.
-  void CancelStream();
-
-  /// Return true if the addition of a new batch of size 'batch_size' would exceed the
-  /// total buffer limit.
-  bool ExceedsLimit(int64_t batch_size) {
-    return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_;
-  }
-
-  /// DataStreamMgr instance used to create this recvr. (Not owned)
-  DataStreamMgr* mgr_;
-
-  /// Fragment and node id of the destination exchange node this receiver is used by.
-  TUniqueId fragment_instance_id_;
-  PlanNodeId dest_node_id_;
-
-  /// soft upper limit on the total amount of buffering allowed for this stream across
-  /// all sender queues. we stop acking incoming data once the amount of buffered data
-  /// exceeds this value
-  int64_t total_buffer_limit_;
-
-  /// Row schema.
-  const RowDescriptor* row_desc_;
-
-  /// True if this reciver merges incoming rows from different senders. Per-sender
-  /// row batch queues are maintained in this case.
-  bool is_merging_;
-
-  /// total number of bytes held across all sender queues.
-  AtomicInt64 num_buffered_bytes_;
-
-  /// Memtracker for batches in the sender queue(s).
-  boost::scoped_ptr<MemTracker> mem_tracker_;
-
-  /// One or more queues of row batches received from senders. If is_merging_ is true,
-  /// there is one SenderQueue for each sender. Otherwise, row batches from all senders
-  /// are placed in the same SenderQueue. The SenderQueue instances are owned by the
-  /// receiver and placed in sender_queue_pool_.
-  std::vector<SenderQueue*> sender_queues_;
-
-  /// SortedRunMerger used to merge rows from different senders.
-  boost::scoped_ptr<SortedRunMerger> merger_;
-
-  /// Pool of sender queues.
-  ObjectPool sender_queue_pool_;
-
-  /// Runtime profile storing the counters below. Child of 'parent_profile' passed into
-  /// constructor.
-  RuntimeProfile* const profile_;
-
-  /// Number of bytes received
-  RuntimeProfile::Counter* bytes_received_counter_;
-
-  /// Time series of number of bytes received, samples bytes_received_counter_
-  RuntimeProfile::TimeSeriesCounter* bytes_received_time_series_counter_;
-
-  RuntimeProfile::Counter* deserialize_row_batch_timer_;
-
-  /// Time spent waiting until the first batch arrives across all queues.
-  /// TODO: Turn this into a wall-clock timer.
-  RuntimeProfile::Counter* first_batch_wait_total_timer_;
-
-  /// Total time (summed across all threads) spent waiting for the
-  /// recv buffer to be drained so that new batches can be
-  /// added. Remote plan fragments are blocked for the same amount of
-  /// time.
-  RuntimeProfile::Counter* buffer_full_total_timer_;
-
-  /// Protects access to buffer_full_wall_timer_. We only want one
-  /// thread to be running the timer at any time, and we use this
-  /// try_mutex to enforce this condition. If a thread does not get
-  /// the lock, it continues to execute, but without running the
-  /// timer.
-  boost::try_mutex buffer_wall_timer_lock_;
-
-  /// Wall time senders spend waiting for the recv buffer to have capacity.
-  RuntimeProfile::Counter* buffer_full_wall_timer_;
-
-  /// Total time spent waiting for data to arrive in the recv buffer
-  RuntimeProfile::Counter* data_arrival_timer_;
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
deleted file mode 100644
index ad500e9..0000000
--- a/be/src/runtime/data-stream-sender.cc
+++ /dev/null
@@ -1,563 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/data-stream-sender.h"
-
-#include <iostream>
-#include <thrift/protocol/TDebugProtocol.h>
-
-#include "common/logging.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/descriptors.h"
-#include "runtime/tuple-row.h"
-#include "runtime/row-batch.h"
-#include "runtime/raw-value.inline.h"
-#include "runtime/runtime-state.h"
-#include "runtime/client-cache.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/backend-client.h"
-#include "util/aligned-new.h"
-#include "util/condition-variable.h"
-#include "util/debug-util.h"
-#include "util/network-util.h"
-#include "util/thread-pool.h"
-#include "rpc/thrift-client.h"
-#include "rpc/thrift-util.h"
-
-#include "gen-cpp/Types_types.h"
-#include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
-
-#include "common/names.h"
-
-using namespace apache::thrift;
-using namespace apache::thrift::protocol;
-using namespace apache::thrift::transport;
-
-namespace impala {
-
-// A channel sends data asynchronously via calls to TransmitData
-// to a single destination ipaddress/node.
-// It has a fixed-capacity buffer and allows the caller either to add rows to
-// that buffer individually (AddRow()), or circumvent the buffer altogether and send
-// TRowBatches directly (SendBatch()). Either way, there can only be one in-flight RPC
-// at any one time (ie, sending will block if the most recent rpc hasn't finished,
-// which allows the receiver node to throttle the sender by withholding acks).
-// *Not* thread-safe.
-class DataStreamSender::Channel : public CacheLineAligned {
- public:
-  // Create channel to send data to particular ipaddress/port/query/node
-  // combination. buffer_size is specified in bytes and a soft limit on
-  // how much tuple data is getting accumulated before being sent; it only applies
-  // when data is added via AddRow() and not sent directly via SendBatch().
-  Channel(DataStreamSender* parent, const RowDescriptor* row_desc,
-      const TNetworkAddress& destination, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int buffer_size)
-    : parent_(parent),
-      buffer_size_(buffer_size),
-      row_desc_(row_desc),
-      address_(MakeNetworkAddress(destination.hostname, destination.port)),
-      fragment_instance_id_(fragment_instance_id),
-      dest_node_id_(dest_node_id),
-      num_data_bytes_sent_(0),
-      rpc_thread_("DataStreamSender", "SenderThread", 1, 1,
-          bind<void>(mem_fn(&Channel::TransmitData), this, _1, _2), true),
-      rpc_in_flight_(false) {}
-
-  // Initialize channel.
-  // Returns OK if successful, error indication otherwise.
-  Status Init(RuntimeState* state) WARN_UNUSED_RESULT;
-
-  // Copies a single row into this channel's output buffer and flushes buffer
-  // if it reaches capacity.
-  // Returns error status if any of the preceding rpcs failed, OK otherwise.
-  Status ALWAYS_INLINE AddRow(TupleRow* row) WARN_UNUSED_RESULT;
-
-  // Asynchronously sends a row batch.
-  // Returns the status of the most recently finished TransmitData
-  // rpc (or OK if there wasn't one that hasn't been reported yet).
-  Status SendBatch(TRowBatch* batch) WARN_UNUSED_RESULT;
-
-  // Return status of last TransmitData rpc (initiated by the most recent call
-  // to either SendBatch() or SendCurrentBatch()).
-  Status GetSendStatus() WARN_UNUSED_RESULT;
-
-  // Waits for the rpc thread pool to finish the current rpc.
-  void WaitForRpc();
-
-  // Drain and shutdown the rpc thread and free the row batch allocation.
-  void Teardown(RuntimeState* state);
-
-  // Flushes any buffered row batches and sends the EOS RPC to close the channel.
-  Status FlushAndSendEos(RuntimeState* state) WARN_UNUSED_RESULT;
-
-  int64_t num_data_bytes_sent() const { return num_data_bytes_sent_; }
-  TRowBatch* thrift_batch() { return &thrift_batch_; }
-
- private:
-  DataStreamSender* parent_;
-  int buffer_size_;
-
-  const RowDescriptor* row_desc_;
-  TNetworkAddress address_;
-  TUniqueId fragment_instance_id_;
-  PlanNodeId dest_node_id_;
-
-  // the number of TRowBatch.data bytes sent successfully
-  int64_t num_data_bytes_sent_;
-
-  // we're accumulating rows into this batch
-  scoped_ptr<RowBatch> batch_;
-  TRowBatch thrift_batch_;
-
-  // We want to reuse the rpc thread to prevent creating a thread per rowbatch.
-  // TODO: currently we only have one batch in flight, but we should buffer more
-  // batches. This is a bit tricky since the channels share the outgoing batch
-  // pointer we need some mechanism to coordinate when the batch is all done.
-  // TODO: if the order of row batches does not matter, we can consider increasing
-  // the number of threads.
-  ThreadPool<TRowBatch*> rpc_thread_; // sender thread.
-  ConditionVariable rpc_done_cv_;   // signaled when rpc_in_flight_ is set to true.
-  mutex rpc_thread_lock_; // Lock with rpc_done_cv_ protecting rpc_in_flight_
-  bool rpc_in_flight_;  // true if the rpc_thread_ is busy sending.
-
-  Status rpc_status_;  // status of most recently finished TransmitData rpc
-  RuntimeState* runtime_state_;
-
-  // Serialize batch_ into thrift_batch_ and send via SendBatch().
-  // Returns SendBatch() status.
-  Status SendCurrentBatch();
-
-  // Synchronously call TransmitData() on a client from impalad_client_cache and
-  // update rpc_status_ based on return value (or set to error if RPC failed).
-  // Called from a thread from the rpc_thread_ pool.
-  void TransmitData(int thread_id, const TRowBatch*);
-  void TransmitDataHelper(const TRowBatch*);
-
-  // Send RPC and retry waiting for response if get RPC timeout error.
-  Status DoTransmitDataRpc(ImpalaBackendConnection* client,
-      const TTransmitDataParams& params, TTransmitDataResult* res);
-};
-
-Status DataStreamSender::Channel::Init(RuntimeState* state) {
-  RETURN_IF_ERROR(rpc_thread_.Init());
-  runtime_state_ = state;
-  // TODO: figure out how to size batch_
-  int capacity = max(1, buffer_size_ / max(row_desc_->GetRowSize(), 1));
-  batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker()));
-  return Status::OK();
-}
-
-Status DataStreamSender::Channel::SendBatch(TRowBatch* batch) {
-  VLOG_ROW << "Channel::SendBatch() fragment_instance_id="
-           << PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
-           << " #rows=" << batch->num_rows;
-  // return if the previous batch saw an error
-  RETURN_IF_ERROR(GetSendStatus());
-  {
-    unique_lock<mutex> l(rpc_thread_lock_);
-    rpc_in_flight_ = true;
-  }
-  if (!rpc_thread_.Offer(batch)) {
-    unique_lock<mutex> l(rpc_thread_lock_);
-    rpc_in_flight_ = false;
-  }
-  return Status::OK();
-}
-
-void DataStreamSender::Channel::TransmitData(int thread_id, const TRowBatch* batch) {
-  DCHECK(rpc_in_flight_);
-  TransmitDataHelper(batch);
-
-  {
-    unique_lock<mutex> l(rpc_thread_lock_);
-    rpc_in_flight_ = false;
-  }
-  rpc_done_cv_.NotifyOne();
-}
-
-void DataStreamSender::Channel::TransmitDataHelper(const TRowBatch* batch) {
-  DCHECK(batch != NULL);
-  VLOG_ROW << "Channel::TransmitData() fragment_instance_id="
-           << PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
-           << " #rows=" << batch->num_rows;
-  TTransmitDataParams params;
-  params.protocol_version = ImpalaInternalServiceVersion::V1;
-  params.__set_dest_fragment_instance_id(fragment_instance_id_);
-  params.__set_dest_node_id(dest_node_id_);
-  params.__set_row_batch(*batch);  // yet another copy
-  params.__set_eos(false);
-  params.__set_sender_id(parent_->sender_id_);
-
-  ImpalaBackendConnection client(runtime_state_->impalad_client_cache(),
-      address_, &rpc_status_);
-  if (!rpc_status_.ok()) return;
-
-  TTransmitDataResult res;
-  client->SetTransmitDataCounter(parent_->thrift_transmit_timer_);
-  rpc_status_ = DoTransmitDataRpc(&client, params, &res);
-  client->ResetTransmitDataCounter();
-  if (!rpc_status_.ok()) return;
-  COUNTER_ADD(parent_->profile_->total_time_counter(),
-      parent_->thrift_transmit_timer_->LapTime());
-
-  if (res.status.status_code != TErrorCode::OK) {
-    rpc_status_ = res.status;
-  } else {
-    num_data_bytes_sent_ += RowBatch::GetSerializedSize(*batch);
-    VLOG_ROW << "incremented #data_bytes_sent="
-             << num_data_bytes_sent_;
-  }
-}
-
-Status DataStreamSender::Channel::DoTransmitDataRpc(ImpalaBackendConnection* client,
-    const TTransmitDataParams& params, TTransmitDataResult* res) {
-  Status status = client->DoRpc(&ImpalaBackendClient::TransmitData, params, res);
-  while (status.code() == TErrorCode::RPC_RECV_TIMEOUT &&
-      !runtime_state_->is_cancelled()) {
-    status = client->RetryRpcRecv(&ImpalaBackendClient::recv_TransmitData, res);
-  }
-  return status;
-}
-
-void DataStreamSender::Channel::WaitForRpc() {
-  SCOPED_TIMER(parent_->state_->total_network_send_timer());
-  unique_lock<mutex> l(rpc_thread_lock_);
-  while (rpc_in_flight_) {
-    rpc_done_cv_.Wait(l);
-  }
-}
-
-inline Status DataStreamSender::Channel::AddRow(TupleRow* row) {
-  if (batch_->AtCapacity()) {
-    // batch_ is full, let's send it; but first wait for an ongoing
-    // transmission to finish before modifying thrift_batch_
-    RETURN_IF_ERROR(SendCurrentBatch());
-  }
-  TupleRow* dest = batch_->GetRow(batch_->AddRow());
-  const vector<TupleDescriptor*>& descs = row_desc_->tuple_descriptors();
-  for (int i = 0; i < descs.size(); ++i) {
-    if (UNLIKELY(row->GetTuple(i) == NULL)) {
-      dest->SetTuple(i, NULL);
-    } else {
-      dest->SetTuple(i, row->GetTuple(i)->DeepCopy(*descs[i], batch_->tuple_data_pool()));
-    }
-  }
-  batch_->CommitLastRow();
-  return Status::OK();
-}
-
-Status DataStreamSender::Channel::SendCurrentBatch() {
-  // make sure there's no in-flight TransmitData() call that might still want to
-  // access thrift_batch_
-  WaitForRpc();
-  RETURN_IF_ERROR(parent_->SerializeBatch(batch_.get(), &thrift_batch_));
-  batch_->Reset();
-  RETURN_IF_ERROR(SendBatch(&thrift_batch_));
-  return Status::OK();
-}
-
-Status DataStreamSender::Channel::GetSendStatus() {
-  WaitForRpc();
-  if (!rpc_status_.ok()) {
-    LOG(ERROR) << "channel send to " << TNetworkAddressToString(address_) << " failed "
-               << "(fragment_instance_id=" << PrintId(fragment_instance_id_) << "): "
-               << rpc_status_.GetDetail();
-  }
-  return rpc_status_;
-}
-
-Status DataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
-  VLOG_RPC << "Channel::FlushAndSendEos() fragment_instance_id="
-           << PrintId(fragment_instance_id_) << " dest_node=" << dest_node_id_
-           << " #rows= " << batch_->num_rows();
-
-  // We can return an error here and not go on to send the EOS RPC because the error that
-  // we returned will be sent to the coordinator who will then cancel all the remote
-  // fragments including the one that this sender is sending to.
-  if (batch_->num_rows() > 0) {
-    // flush
-    RETURN_IF_ERROR(SendCurrentBatch());
-  }
-
-  RETURN_IF_ERROR(GetSendStatus());
-
-  Status client_cnxn_status;
-  ImpalaBackendConnection client(runtime_state_->impalad_client_cache(),
-      address_, &client_cnxn_status);
-  RETURN_IF_ERROR(client_cnxn_status);
-
-  TTransmitDataParams params;
-  params.protocol_version = ImpalaInternalServiceVersion::V1;
-  params.__set_dest_fragment_instance_id(fragment_instance_id_);
-  params.__set_dest_node_id(dest_node_id_);
-  params.__set_sender_id(parent_->sender_id_);
-  params.__set_eos(true);
-  TTransmitDataResult res;
-
-  VLOG_RPC << "calling TransmitData(eos=true) to terminate channel.";
-  rpc_status_ = DoTransmitDataRpc(&client, params, &res);
-  if (!rpc_status_.ok()) {
-    LOG(ERROR) << "Failed to send EOS to " << TNetworkAddressToString(address_)
-               << " (fragment_instance_id=" << PrintId(fragment_instance_id_) << "): "
-               << rpc_status_.GetDetail();
-    return rpc_status_;
-  }
-  return Status(res.status);
-}
-
-void DataStreamSender::Channel::Teardown(RuntimeState* state) {
-  // FlushAndSendEos() should have been called before calling Teardown(), which means that
-  // all the data should already be drained. Calling DrainAndShutdown() only to shutdown.
-  rpc_thread_.DrainAndShutdown();
-  batch_.reset();
-}
-
-DataStreamSender::DataStreamSender(int sender_id, const RowDescriptor* row_desc,
-    const TDataStreamSink& sink, const vector<TPlanFragmentDestination>& destinations,
-    int per_channel_buffer_size, RuntimeState* state)
-  : DataSink(row_desc,
-        Substitute("DataStreamSender (dst_id=$0)", sink.dest_node_id), state),
-    sender_id_(sender_id),
-    partition_type_(sink.output_partition.type),
-    current_channel_idx_(0),
-    flushed_(false),
-    current_thrift_batch_(&thrift_batch1_),
-    serialize_batch_timer_(NULL),
-    thrift_transmit_timer_(NULL),
-    bytes_sent_counter_(NULL),
-    total_sent_rows_counter_(NULL),
-    dest_node_id_(sink.dest_node_id),
-    next_unknown_partition_(0) {
-  DCHECK_GT(destinations.size(), 0);
-  DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
-      || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
-      || sink.output_partition.type == TPartitionType::RANDOM
-      || sink.output_partition.type == TPartitionType::KUDU);
-  // TODO: use something like google3's linked_ptr here (scoped_ptr isn't copyable)
-  for (int i = 0; i < destinations.size(); ++i) {
-    channels_.push_back(
-        new Channel(this, row_desc, destinations[i].thrift_backend,
-            destinations[i].fragment_instance_id, sink.dest_node_id,
-            per_channel_buffer_size));
-  }
-
-  if (partition_type_ == TPartitionType::UNPARTITIONED
-      || partition_type_ == TPartitionType::RANDOM) {
-    // Randomize the order we open/transmit to channels to avoid thundering herd problems.
-    random_shuffle(channels_.begin(), channels_.end());
-  }
-}
-
-DataStreamSender::~DataStreamSender() {
-  // TODO: check that sender was either already closed() or there was an error
-  // on some channel
-  for (int i = 0; i < channels_.size(); ++i) {
-    delete channels_[i];
-  }
-}
-
-Status DataStreamSender::Init(const vector<TExpr>& thrift_output_exprs,
-    const TDataSink& tsink, RuntimeState* state) {
-  DCHECK(tsink.__isset.stream_sink);
-  if (partition_type_ == TPartitionType::HASH_PARTITIONED ||
-      partition_type_ == TPartitionType::KUDU) {
-    RETURN_IF_ERROR(ScalarExpr::Create(tsink.stream_sink.output_partition.partition_exprs,
-        *row_desc_, state, &partition_exprs_));
-  }
-  return Status::OK();
-}
-
-Status DataStreamSender::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
-  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
-  state_ = state;
-  SCOPED_TIMER(profile_->total_time_counter());
-  RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_exprs_, state,
-      state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(),
-      &partition_expr_evals_));
-  bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
-  uncompressed_bytes_counter_ =
-      ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
-  serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime");
-  thrift_transmit_timer_ =
-      profile()->AddConcurrentTimerCounter("TransmitDataRPCTime", TUnit::TIME_NS);
-  network_throughput_ =
-      profile()->AddDerivedCounter("NetworkThroughput(*)", TUnit::BYTES_PER_SECOND,
-          bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
-                                       thrift_transmit_timer_));
-  overall_throughput_ =
-      profile()->AddDerivedCounter("OverallThroughput", TUnit::BYTES_PER_SECOND,
-           bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
-                         profile()->total_time_counter()));
-
-  total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsReturned", TUnit::UNIT);
-  for (int i = 0; i < channels_.size(); ++i) {
-    RETURN_IF_ERROR(channels_[i]->Init(state));
-  }
-  return Status::OK();
-}
-
-Status DataStreamSender::Open(RuntimeState* state) {
-  return ScalarExprEvaluator::Open(partition_expr_evals_, state);
-}
-
-Status DataStreamSender::Send(RuntimeState* state, RowBatch* batch) {
-  DCHECK(!closed_);
-  DCHECK(!flushed_);
-
-  if (batch->num_rows() == 0) return Status::OK();
-  if (partition_type_ == TPartitionType::UNPARTITIONED || channels_.size() == 1) {
-    // current_thrift_batch_ is *not* the one that was written by the last call
-    // to Serialize()
-    RETURN_IF_ERROR(SerializeBatch(batch, current_thrift_batch_, channels_.size()));
-    // SendBatch() will block if there are still in-flight rpcs (and those will
-    // reference the previously written thrift batch)
-    for (int i = 0; i < channels_.size(); ++i) {
-      RETURN_IF_ERROR(channels_[i]->SendBatch(current_thrift_batch_));
-    }
-    current_thrift_batch_ =
-        (current_thrift_batch_ == &thrift_batch1_ ? &thrift_batch2_ : &thrift_batch1_);
-  } else if (partition_type_ == TPartitionType::RANDOM) {
-    // Round-robin batches among channels. Wait for the current channel to finish its
-    // rpc before overwriting its batch.
-    Channel* current_channel = channels_[current_channel_idx_];
-    current_channel->WaitForRpc();
-    RETURN_IF_ERROR(SerializeBatch(batch, current_channel->thrift_batch()));
-    RETURN_IF_ERROR(current_channel->SendBatch(current_channel->thrift_batch()));
-    current_channel_idx_ = (current_channel_idx_ + 1) % channels_.size();
-  } else if (partition_type_ == TPartitionType::KUDU) {
-    DCHECK_EQ(partition_expr_evals_.size(), 1);
-    int num_channels = channels_.size();
-    const int num_rows = batch->num_rows();
-    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
-    int channel_ids[hash_batch_size];
-
-    for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
-      const int batch_window_size = min(num_rows - batch_start, hash_batch_size);
-      for (int i = 0; i < batch_window_size; ++i) {
-        TupleRow* row = batch->GetRow(i + batch_start);
-        int32_t partition =
-            *reinterpret_cast<int32_t*>(partition_expr_evals_[0]->GetValue(row));
-        if (partition < 0) {
-          // This row doesn't correspond to a partition,
-          //  e.g. it's outside the given ranges.
-          partition = next_unknown_partition_;
-          ++next_unknown_partition_;
-        }
-        channel_ids[i] = partition % num_channels;
-      }
-
-      for (int i = 0; i < batch_window_size; ++i) {
-        TupleRow* row = batch->GetRow(i + batch_start);
-        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
-      }
-    }
-  } else {
-    DCHECK(partition_type_ == TPartitionType::HASH_PARTITIONED);
-    // hash-partition batch's rows across channels
-    // TODO: encapsulate this in an Expr as we've done for Kudu above and remove this case
-    // once we have codegen here.
-    int num_channels = channels_.size();
-    const int num_partition_exprs = partition_exprs_.size();
-    const int num_rows = batch->num_rows();
-    const int hash_batch_size = RowBatch::HASH_BATCH_SIZE;
-    int channel_ids[hash_batch_size];
-
-    // Break the loop into two parts break the data dependency between computing
-    // the hash and calling AddRow()
-    // To keep stack allocation small a RowBatch::HASH_BATCH is used
-    for (int batch_start = 0; batch_start < num_rows; batch_start += hash_batch_size) {
-      int batch_window_size = min(num_rows - batch_start, hash_batch_size);
-      for (int i = 0; i < batch_window_size; ++i) {
-        TupleRow* row = batch->GetRow(i + batch_start);
-        uint64_t hash_val = EXCHANGE_HASH_SEED;
-        for (int j = 0; j < num_partition_exprs; ++j) {
-          ScalarExprEvaluator* eval = partition_expr_evals_[j];
-          void* partition_val = eval->GetValue(row);
-          // We can't use the crc hash function here because it does not result in
-          // uncorrelated hashes with different seeds. Instead we use FastHash.
-          // TODO: fix crc hash/GetHashValue()
-          DCHECK(&(eval->root()) == partition_exprs_[j]);
-          hash_val = RawValue::GetHashValueFastHash(
-              partition_val, partition_exprs_[j]->type(), hash_val);
-        }
-        channel_ids[i] = hash_val % num_channels;
-      }
-
-      for (int i = 0; i < batch_window_size; ++i) {
-        TupleRow* row = batch->GetRow(i + batch_start);
-        RETURN_IF_ERROR(channels_[channel_ids[i]]->AddRow(row));
-      }
-    }
-  }
-  COUNTER_ADD(total_sent_rows_counter_, batch->num_rows());
-  expr_results_pool_->Clear();
-  RETURN_IF_ERROR(state->CheckQueryState());
-  return Status::OK();
-}
-
-Status DataStreamSender::FlushFinal(RuntimeState* state) {
-  DCHECK(!flushed_);
-  DCHECK(!closed_);
-  flushed_ = true;
-  for (int i = 0; i < channels_.size(); ++i) {
-    // If we hit an error here, we can return without closing the remaining channels as
-    // the error is propagated back to the coordinator, which in turn cancels the query,
-    // which will cause the remaining open channels to be closed.
-    RETURN_IF_ERROR(channels_[i]->FlushAndSendEos(state));
-  }
-  return Status::OK();
-}
-
-void DataStreamSender::Close(RuntimeState* state) {
-  if (closed_) return;
-  for (int i = 0; i < channels_.size(); ++i) {
-    channels_[i]->Teardown(state);
-  }
-  ScalarExprEvaluator::Close(partition_expr_evals_, state);
-  ScalarExpr::Close(partition_exprs_);
-  DataSink::Close(state);
-}
-
-Status DataStreamSender::SerializeBatch(
-    RowBatch* src, TRowBatch* dest, int num_receivers) {
-  VLOG_ROW << "serializing " << src->num_rows() << " rows";
-  {
-    SCOPED_TIMER(profile_->total_time_counter());
-    SCOPED_TIMER(serialize_batch_timer_);
-    RETURN_IF_ERROR(src->Serialize(dest));
-    int64_t bytes = RowBatch::GetSerializedSize(*dest);
-    int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest);
-    COUNTER_ADD(bytes_sent_counter_, bytes * num_receivers);
-    COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * num_receivers);
-  }
-  return Status::OK();
-}
-
-int64_t DataStreamSender::GetNumDataBytesSent() const {
-  // TODO: do we need synchronization here or are reads & writes to 8-byte ints
-  // atomic?
-  int64_t result = 0;
-  for (int i = 0; i < channels_.size(); ++i) {
-    result += channels_[i]->num_data_bytes_sent();
-  }
-  return result;
-}
-
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.h b/be/src/runtime/data-stream-sender.h
deleted file mode 100644
index 37b9417..0000000
--- a/be/src/runtime/data-stream-sender.h
+++ /dev/null
@@ -1,158 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_RUNTIME_DATA_STREAM_SENDER_H
-#define IMPALA_RUNTIME_DATA_STREAM_SENDER_H
-
-#include <vector>
-#include <string>
-
-#include "exec/data-sink.h"
-#include "common/global-types.h"
-#include "common/object-pool.h"
-#include "common/status.h"
-#include "util/runtime-profile.h"
-#include "gen-cpp/Results_types.h" // for TRowBatch
-
-namespace impala {
-
-class RowBatch;
-class RowDescriptor;
-
-class MemTracker;
-class TDataStreamSink;
-class TNetworkAddress;
-class TPlanFragmentDestination;
-
-/// Single sender of an m:n data stream.
-/// Row batch data is routed to destinations based on the provided
-/// partitioning specification.
-/// *Not* thread-safe.
-//
-/// TODO: capture stats that describe distribution of rows/data volume
-/// across channels.
-/// TODO: create a PlanNode equivalent class for DataSink.
-class DataStreamSender : public DataSink {
- public:
-  /// Construct a sender according to the output specification (sink),
-  /// sending to the given destinations. sender_id identifies this
-  /// sender instance, and is unique within a fragment.
-  /// Per_channel_buffer_size is the buffer size allocated to each channel
-  /// and is specified in bytes.
-  /// The RowDescriptor must live until Close() is called.
-  /// NOTE: supported partition types are UNPARTITIONED (broadcast), HASH_PARTITIONED,
-  /// and RANDOM.
-  DataStreamSender(int sender_id, const RowDescriptor* row_desc,
-      const TDataStreamSink& tsink,
-      const std::vector<TPlanFragmentDestination>& destinations,
-      int per_channel_buffer_size, RuntimeState* state);
-
-  virtual ~DataStreamSender();
-
-  /// Must be called before other API calls, and before the codegen'd IR module is
-  /// compiled (i.e. in an ExecNode's Prepare() function).
-  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
-
-  /// Must be called before Send() or Close(), and after the codegen'd IR module is
-  /// compiled (i.e. in an ExecNode's Open() function).
-  virtual Status Open(RuntimeState* state);
-
-  /// Flush all buffered data and close all existing channels to destination hosts.
-  /// Further Send() calls are illegal after calling FlushFinal().
-  /// It is legal to call FlushFinal() only 0 or 1 times.
-  virtual Status FlushFinal(RuntimeState* state);
-
-  /// Send data in 'batch' to destination nodes according to partitioning
-  /// specification provided in c'tor.
-  /// Blocks until all rows in batch are placed in their appropriate outgoing
-  /// buffers (ie, blocks if there are still in-flight rpcs from the last
-  /// Send() call).
-  virtual Status Send(RuntimeState* state, RowBatch* batch);
-
-  /// Shutdown all existing channels to destination hosts. Further FlushFinal() calls are
-  /// illegal after calling Close().
-  virtual void Close(RuntimeState* state);
-
-  /// Serializes the src batch into the dest thrift batch. Maintains metrics.
-  /// num_receivers is the number of receivers this batch will be sent to. Only
-  /// used to maintain metrics.
-  Status SerializeBatch(RowBatch* src, TRowBatch* dest, int num_receivers = 1);
-
- protected:
-  friend class DataStreamTest;
-
-  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
-      const TDataSink& tsink, RuntimeState* state);
-
-  /// Return total number of bytes sent in TRowBatch.data. If batches are
-  /// broadcast to multiple receivers, they are counted once per receiver.
-  int64_t GetNumDataBytesSent() const;
-
- private:
-  class Channel;
-
-  /// Sender instance id, unique within a fragment.
-  int sender_id_;
-  RuntimeState* state_;
-  TPartitionType::type partition_type_; // The type of partitioning to perform.
-  int current_channel_idx_; // index of current channel to send to if random_ == true
-
-  /// If true, this sender has called FlushFinal() successfully.
-  /// Not valid to call Send() anymore.
-  bool flushed_;
-
-  /// serialized batches for broadcasting; we need two so we can write
-  /// one while the other one is still being sent
-  TRowBatch thrift_batch1_;
-  TRowBatch thrift_batch2_;
-  TRowBatch* current_thrift_batch_;  // the next one to fill in Send()
-
-  std::vector<Channel*> channels_;
-
-  /// Expressions of partition keys. It's used to compute the
-  /// per-row partition values for shuffling exchange;
-  std::vector<ScalarExpr*> partition_exprs_;
-  std::vector<ScalarExprEvaluator*> partition_expr_evals_;
-
-  RuntimeProfile::Counter* serialize_batch_timer_;
-  /// The concurrent wall time spent sending data over the network.
-  RuntimeProfile::ConcurrentTimerCounter* thrift_transmit_timer_;
-  RuntimeProfile::Counter* bytes_sent_counter_;
-  RuntimeProfile::Counter* uncompressed_bytes_counter_;
-  RuntimeProfile::Counter* total_sent_rows_counter_;
-
-  /// Throughput per time spent in TransmitData
-  RuntimeProfile::Counter* network_throughput_;
-
-  /// Throughput per total time spent in sender
-  RuntimeProfile::Counter* overall_throughput_;
-
-  /// Identifier of the destination plan node.
-  PlanNodeId dest_node_id_;
-
-  /// Used for Kudu partitioning to round-robin rows that don't correspond to a partition
-  /// or when errors are encountered.
-  int next_unknown_partition_;
-
-  /// An arbitrary hash seed used for exchanges.
-  static constexpr uint64_t EXCHANGE_HASH_SEED = 0x66bd68df22c3ef37;
-};
-
-}
-
-#endif