You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2018/07/24 18:01:44 UTC

[3/5] impala git commit: IMPALA-7212: Removes --use_krpc flag and remove old DataStream services

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 9f866ba..c1f9cc6 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -30,14 +30,10 @@
 #include "rpc/rpc-mgr.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
-#include "runtime/data-stream-mgr-base.h"
-#include "runtime/data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/krpc-data-stream-recvr.h"
 #include "runtime/krpc-data-stream-sender.h"
-#include "runtime/data-stream-sender.h"
-#include "runtime/data-stream-recvr-base.h"
-#include "runtime/data-stream-recvr.h"
 #include "runtime/descriptors.h"
 #include "runtime/client-cache.h"
 #include "runtime/backend-client.h"
@@ -55,8 +51,6 @@
 #include "util/test-info.h"
 #include "util/tuple-row-compare.h"
 #include "gen-cpp/data_stream_service.pb.h"
-#include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
 #include "gen-cpp/Descriptors_types.h"
 #include "service/fe-support.h"
@@ -82,8 +76,6 @@ DECLARE_int32(datastream_service_num_deserialization_threads);
 DECLARE_int32(datastream_service_deserialization_queue_size);
 DECLARE_string(datastream_service_queue_mem_limit);
 
-DECLARE_bool(use_krpc);
-
 static const PlanNodeId DEST_NODE_ID = 1;
 static const int BATCH_CAPACITY = 100;  // rows
 static const int PER_ROW_DATA = 8;
@@ -93,39 +85,6 @@ static const int SHORT_SERVICE_QUEUE_MEM_LIMIT = 16;
 
 namespace impala {
 
-// This class acts as a service interface for all Thrift related communication within
-// this test file.
-class ImpalaThriftTestBackend : public ImpalaInternalServiceIf {
- public:
-  ImpalaThriftTestBackend(DataStreamMgr* stream_mgr): mgr_(stream_mgr) {}
-  virtual ~ImpalaThriftTestBackend() {}
-
-  virtual void ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
-      const TExecQueryFInstancesParams& params) {}
-  virtual void CancelQueryFInstances(TCancelQueryFInstancesResult& return_val,
-      const TCancelQueryFInstancesParams& params) {}
-  virtual void ReportExecStatus(TReportExecStatusResult& return_val,
-      const TReportExecStatusParams& params) {}
-  virtual void UpdateFilter(TUpdateFilterResult& return_val,
-      const TUpdateFilterParams& params) {}
-  virtual void PublishFilter(TPublishFilterResult& return_val,
-      const TPublishFilterParams& params) {}
-
-  virtual void TransmitData(
-      TTransmitDataResult& return_val, const TTransmitDataParams& params) {
-    if (!params.eos) {
-      mgr_->AddData(params.dest_fragment_instance_id, params.dest_node_id,
-                    params.row_batch, params.sender_id).SetTStatus(&return_val);
-    } else {
-      mgr_->CloseSender(params.dest_fragment_instance_id, params.dest_node_id,
-          params.sender_id).SetTStatus(&return_val);
-    }
-  }
-
- private:
-  DataStreamMgr* mgr_;
-};
-
 // This class acts as a service interface for all KRPC related communication within
 // this test file.
 class ImpalaKRPCTestBackend : public DataStreamServiceIf {
@@ -166,18 +125,7 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
   unique_ptr<MemTracker> mem_tracker_;
 };
 
-template <class T> class DataStreamTestBase : public T {
- protected:
-  virtual void SetUp() {}
-  virtual void TearDown() {}
-};
-
-enum KrpcSwitch {
-  USE_THRIFT,
-  USE_KRPC
-};
-
-class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwitch>> {
+class DataStreamTest : public testing::Test {
  protected:
   DataStreamTest() : next_val_(0) {
     // Stop tests that rely on mismatched sender / receiver pairs timing out from failing.
@@ -186,9 +134,6 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   ~DataStreamTest() { runtime_state_->ReleaseResources(); }
 
   virtual void SetUp() {
-    // Initialize MemTrackers and RuntimeState for use by the data stream receiver.
-    FLAGS_use_krpc = GetParam() == USE_KRPC;
-
     exec_env_.reset(new ExecEnv());
     ABORT_IF_ERROR(exec_env_->InitForFeTests());
     exec_env_->InitBufferPool(32 * 1024, 1024 * 1024 * 1024, 32 * 1024);
@@ -234,14 +179,10 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     hash_sink_.output_partition.__isset.partition_exprs = true;
     hash_sink_.output_partition.partition_exprs.push_back(expr);
 
-    if (GetParam() == USE_THRIFT) {
-      StartThriftBackend();
-    } else {
-      IpAddr ip;
-      ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
-      krpc_address_ = MakeNetworkAddress(ip, FLAGS_port);
-      StartKrpcBackend();
-    }
+    IpAddr ip;
+    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+    krpc_address_ = MakeNetworkAddress(ip, FLAGS_port);
+    StartKrpcBackend();
   }
 
   const TDataSink GetSink(TPartitionType::type partition_type) {
@@ -267,12 +208,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     less_than_->Close(runtime_state_.get());
     ScalarExpr::Close(ordering_exprs_);
     mem_pool_->FreeAll();
-    if (GetParam() == USE_THRIFT) {
-      exec_env_->impalad_client_cache()->TestShutdown();
-      StopThriftBackend();
-    } else {
-      StopKrpcBackend();
-    }
+    StopKrpcBackend();
     exec_env_->buffer_pool()->DeregisterClient(&buffer_pool_client_);
   }
 
@@ -312,8 +248,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   unique_ptr<ImpalaKRPCTestBackend> test_service_;
 
   // receiving node
-  DataStreamMgrBase* stream_mgr_ = nullptr;
-  ThriftServer* server_ = nullptr;
+  KrpcDataStreamMgr* stream_mgr_ = nullptr;
 
   // sending node(s)
   TDataStreamSink broadcast_sink_;
@@ -335,7 +270,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     int receiver_num;
 
     unique_ptr<thread> thread_handle;
-    shared_ptr<DataStreamRecvrBase> stream_recvr;
+    shared_ptr<KrpcDataStreamRecvr> stream_recvr;
     Status status;
     int num_rows_received = 0;
     multiset<int64_t> data_values;
@@ -360,9 +295,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     dest.fragment_instance_id = next_instance_id_;
     dest.thrift_backend.hostname = "localhost";
     dest.thrift_backend.port = FLAGS_port;
-    if (GetParam() == USE_KRPC) {
-      dest.__set_krpc_backend(krpc_address_);
-    }
+    dest.__set_krpc_backend(krpc_address_);
     *instance_id = next_instance_id_;
     ++next_instance_id_.lo;
   }
@@ -524,7 +457,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
           // hash-partitioned streams send values to the right partition
           int64_t value = *j;
           uint64_t hash_val = RawValue::GetHashValueFastHash(&value, TYPE_BIGINT,
-              DataStreamSender::EXCHANGE_HASH_SEED);
+              KrpcDataStreamSender::EXCHANGE_HASH_SEED);
           EXPECT_EQ(hash_val % receiver_info_.size(), info->receiver_num);
         }
       }
@@ -550,21 +483,9 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     }
   }
 
-  // Start Thrift based backend in separate thread.
-  void StartThriftBackend() {
-    // Dynamic cast stream_mgr_ which is of type DataStreamMgrBase to derived type
-    // DataStreamMgr, since ImpalaThriftTestBackend() accepts only DataStreamMgr*.
-    boost::shared_ptr<ImpalaThriftTestBackend> handler(
-        new ImpalaThriftTestBackend(exec_env_->ThriftStreamMgr()));
-    boost::shared_ptr<TProcessor> processor(new ImpalaInternalServiceProcessor(handler));
-    ThriftServerBuilder builder("DataStreamTest backend", processor, FLAGS_port);
-    ASSERT_OK(builder.Build(&server_));
-    ASSERT_OK(server_->Start());
-  }
-
   void StartKrpcBackend() {
     RpcMgr* rpc_mgr = exec_env_->rpc_mgr();
-    KrpcDataStreamMgr* krpc_stream_mgr = exec_env_->KrpcStreamMgr();
+    KrpcDataStreamMgr* krpc_stream_mgr = exec_env_->stream_mgr();
     ASSERT_OK(rpc_mgr->Init());
     test_service_.reset(new ImpalaKRPCTestBackend(rpc_mgr, krpc_stream_mgr,
         exec_env_->process_mem_tracker()));
@@ -573,12 +494,6 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     ASSERT_OK(rpc_mgr->StartServices(krpc_address_));
   }
 
-  void StopThriftBackend() {
-    VLOG_QUERY << "stop backend\n";
-    server_->StopForTesting();
-    delete server_;
-  }
-
   void StopKrpcBackend() {
     exec_env_->rpc_mgr()->Shutdown();
   }
@@ -590,7 +505,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     sender_info_.emplace_back(make_unique<SenderInfo>());
     sender_info_.back()->thread_handle.reset(
         new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size,
-                   partition_type, GetParam() == USE_THRIFT));
+            partition_type));
   }
 
   void JoinSenders() {
@@ -600,8 +515,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     }
   }
 
-  void Sender(int sender_num,
-      int channel_buffer_size, TPartitionType::type partition_type, bool is_thrift) {
+  void Sender(
+      int sender_num, int channel_buffer_size, TPartitionType::type partition_type) {
     RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataSink& sink = GetSink(partition_type);
@@ -615,18 +530,10 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     TExpr output_exprs;
     output_exprs.nodes.push_back(expr_node);
 
-    if (is_thrift) {
-      sender.reset(new DataStreamSender(
-          sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
-      EXPECT_OK(static_cast<DataStreamSender*>(
-          sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
-    } else {
-      sender.reset(new KrpcDataStreamSender(
-          sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
-      EXPECT_OK(static_cast<KrpcDataStreamSender*>(
-          sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
-    }
-
+    sender.reset(new KrpcDataStreamSender(
+        sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
+    EXPECT_OK(static_cast<KrpcDataStreamSender*>(
+        sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
     EXPECT_OK(sender->Prepare(&state, &tracker_));
     EXPECT_OK(sender->Open(&state));
     scoped_ptr<RowBatch> batch(CreateRowBatch());
@@ -641,13 +548,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     VLOG_QUERY << "closing sender" << sender_num;
     info->status.MergeStatus(sender->FlushFinal(&state));
     sender->Close(&state);
-    if (is_thrift) {
-      info->num_bytes_sent = static_cast<DataStreamSender*>(
-          sender.get())->GetNumDataBytesSent();
-    } else {
-      info->num_bytes_sent = static_cast<KrpcDataStreamSender*>(
-          sender.get())->GetNumDataBytesSent();
-    }
+    info->num_bytes_sent = static_cast<KrpcDataStreamSender*>(
+        sender.get())->GetNumDataBytesSent();
 
     batch->Reset();
     state.ReleaseResources();
@@ -672,18 +574,6 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   }
 };
 
-// A seperate class for tests that are required to be run against Thrift only.
-class DataStreamTestThriftOnly : public DataStreamTest {
- protected:
-  virtual void SetUp() {
-    DataStreamTest::SetUp();
-  }
-
-  virtual void TearDown() {
-    DataStreamTest::TearDown();
-  }
-};
-
 // A seperate test class which simulates the behavior in which deserialization queue
 // fills up and all deserialization threads are busy.
 class DataStreamTestShortDeserQueue : public DataStreamTest {
@@ -715,19 +605,7 @@ class DataStreamTestShortServiceQueue : public DataStreamTest {
   }
 };
 
-INSTANTIATE_TEST_CASE_P(ThriftOrKrpc, DataStreamTest,
-    ::testing::Values(USE_KRPC, USE_THRIFT));
-
-INSTANTIATE_TEST_CASE_P(ThriftOnly, DataStreamTestThriftOnly,
-    ::testing::Values(USE_THRIFT));
-
-INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestShortDeserQueue,
-    ::testing::Values(USE_KRPC));
-
-INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestShortServiceQueue,
-    ::testing::Values(USE_KRPC));
-
-TEST_P(DataStreamTest, UnknownSenderSmallResult) {
+TEST_F(DataStreamTest, UnknownSenderSmallResult) {
   // starting a sender w/o a corresponding receiver results in an error. No bytes should
   // be sent.
   // case 1: entire query result fits in single buffer
@@ -738,7 +616,7 @@ TEST_P(DataStreamTest, UnknownSenderSmallResult) {
   EXPECT_EQ(sender_info_[0]->status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
 }
 
-TEST_P(DataStreamTest, UnknownSenderLargeResult) {
+TEST_F(DataStreamTest, UnknownSenderLargeResult) {
   // case 2: query result requires multiple buffers
   TUniqueId dummy_id;
   GetNextInstanceId(&dummy_id);
@@ -747,7 +625,7 @@ TEST_P(DataStreamTest, UnknownSenderLargeResult) {
   EXPECT_EQ(sender_info_[0]->status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
 }
 
-TEST_P(DataStreamTest, Cancel) {
+TEST_F(DataStreamTest, Cancel) {
   TUniqueId instance_id;
   StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, false, &instance_id);
   stream_mgr_->Cancel(instance_id);
@@ -758,7 +636,7 @@ TEST_P(DataStreamTest, Cancel) {
   EXPECT_TRUE(receiver_info_[1]->status.IsCancelled());
 }
 
-TEST_P(DataStreamTest, BasicTest) {
+TEST_F(DataStreamTest, BasicTest) {
   // TODO: also test that all client connections have been returned
   TPartitionType::type stream_types[] =
       {TPartitionType::UNPARTITIONED, TPartitionType::RANDOM,
@@ -781,54 +659,6 @@ TEST_P(DataStreamTest, BasicTest) {
   }
 }
 
-// This test checks for the avoidance of IMPALA-2931, which is a crash that would occur if
-// the parent memtracker of a DataStreamRecvr's memtracker was deleted before the
-// DataStreamRecvr was destroyed. The fix was to move decoupling the child tracker from
-// the parent into DataStreamRecvr::Close() which should always be called before the
-// parent is destroyed. In practice the parent is a member of the query's runtime state.
-//
-// TODO: Make lifecycle requirements more explicit.
-TEST_P(DataStreamTestThriftOnly, CloseRecvrWhileReferencesRemain) {
-  scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), exec_env_.get()));
-  RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
-
-  // Start just one receiver.
-  TUniqueId instance_id;
-  GetNextInstanceId(&instance_id);
-  shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(row_desc_,
-      instance_id, DEST_NODE_ID, 1, 1, false, profile, &tracker_, nullptr);
-
-  // Perform tear down, but keep a reference to the receiver so that it is deleted last
-  // (to confirm that the destructor does not access invalid state after tear-down).
-  stream_recvr->Close();
-
-  // Force deletion of the parent memtracker by destroying it's owning runtime state.
-  runtime_state->ReleaseResources();
-  runtime_state.reset();
-
-  // Send an eos RPC to the receiver. Not required for tear-down, but confirms that the
-  // RPC does not cause an error (the receiver will still be called, since it is only
-  // Close()'d, not deleted from the data stream manager).
-  Status rpc_status;
-  ImpalaBackendConnection client(exec_env_->impalad_client_cache(),
-      MakeNetworkAddress("localhost", FLAGS_port), &rpc_status);
-  EXPECT_OK(rpc_status);
-  TTransmitDataParams params;
-  params.protocol_version = ImpalaInternalServiceVersion::V1;
-  params.__set_eos(true);
-  params.__set_dest_fragment_instance_id(instance_id);
-  params.__set_dest_node_id(DEST_NODE_ID);
-  TUniqueId dummy_id;
-  params.__set_sender_id(0);
-
-  TTransmitDataResult result;
-  rpc_status = client.DoRpc(&ImpalaBackendClient::TransmitData, params, &result);
-
-  // Finally, stream_recvr destructor happens here. Before fix for IMPALA-2931, this
-  // would have resulted in a crash.
-  stream_recvr.reset();
-}
-
 // This test is to exercise a previously present deadlock path which is now fixed, to
 // ensure that the deadlock does not happen anymore. It does this by doing the following:
 // This test starts multiple senders to send to the same receiver. It makes sure that
@@ -841,7 +671,7 @@ TEST_P(DataStreamTestThriftOnly, CloseRecvrWhileReferencesRemain) {
 // already being deserialized will be waiting on the KrpcDataStreamMgr::lock_ as well.
 // But the first thread will never release the lock since it's stuck on Offer(), causing
 // a deadlock. This is fixed with IMPALA-6346.
-TEST_P(DataStreamTestShortDeserQueue, TestNoDeadlock) {
+TEST_F(DataStreamTestShortDeserQueue, TestNoDeadlock) {
   TUniqueId instance_id;
   GetNextInstanceId(&instance_id);
 
@@ -874,7 +704,7 @@ TEST_P(DataStreamTestShortDeserQueue, TestNoDeadlock) {
 }
 
 // Test that payloads larger than the service queue's soft mem limit can be transmitted.
-TEST_P(DataStreamTestShortServiceQueue, TestLargePayload) {
+TEST_F(DataStreamTestShortServiceQueue, TestLargePayload) {
   TestStream(
       TPartitionType::UNPARTITIONED, 4, 1, SHORT_SERVICE_QUEUE_MEM_LIMIT * 2, false);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 349b817..319e948 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -35,7 +35,6 @@
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
-#include "runtime/data-stream-mgr.h"
 #include "runtime/hbase-table-factory.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/io/disk-io-mgr.h"
@@ -80,8 +79,6 @@ DEFINE_int32(state_store_subscriber_port, 23000,
     "port where StatestoreSubscriberService should be exported");
 DEFINE_int32(num_hdfs_worker_threads, 16,
     "(Advanced) The number of threads in the global HDFS operation pool");
-DEFINE_bool(use_krpc, true, "If true, use KRPC for the DataStream subsystem. "
-    "Otherwise use Thrift RPC.");
 
 DEFINE_bool_hidden(use_local_catalog, false,
   "Use experimental implementation of a local catalog. If this is set, "
@@ -159,15 +156,10 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
     configured_backend_address_(MakeNetworkAddress(FLAGS_hostname, backend_port)) {
 
-  if (FLAGS_use_krpc) {
-    VLOG_QUERY << "Using KRPC.";
-    // KRPC relies on resolved IP address. It's set in Init().
-    krpc_address_.__set_port(krpc_port);
-    rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured()));
-    stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
-  } else {
-    stream_mgr_.reset(new DataStreamMgr(metrics_.get()));
-  }
+  // KRPC relies on resolved IP address. It's set in Init().
+  krpc_address_.__set_port(krpc_port);
+  rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured()));
+  stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
 
   request_pool_service_.reset(new RequestPoolService(metrics_.get()));
 
@@ -298,19 +290,17 @@ Status ExecEnv::Init() {
       "Buffer Pool: Unused Reservation", mem_tracker_.get()));
 
   // Initializes the RPCMgr and DataStreamServices.
-  if (FLAGS_use_krpc) {
-    krpc_address_.__set_hostname(ip_address_);
-    // Initialization needs to happen in the following order due to dependencies:
-    // - RPC manager, DataStreamService and DataStreamManager.
-    RETURN_IF_ERROR(rpc_mgr_->Init());
-    data_svc_.reset(new DataStreamService(rpc_metrics_));
-    RETURN_IF_ERROR(data_svc_->Init());
-    RETURN_IF_ERROR(KrpcStreamMgr()->Init(data_svc_->mem_tracker()));
-    // Bump thread cache to 1GB to reduce contention for TCMalloc central
-    // list's spinlock.
-    if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
-      FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
-    }
+  krpc_address_.__set_hostname(ip_address_);
+  // Initialization needs to happen in the following order due to dependencies:
+  // - RPC manager, DataStreamService and DataStreamManager.
+  RETURN_IF_ERROR(rpc_mgr_->Init());
+  data_svc_.reset(new DataStreamService(rpc_metrics_));
+  RETURN_IF_ERROR(data_svc_->Init());
+  RETURN_IF_ERROR(stream_mgr_->Init(data_svc_->mem_tracker()));
+  // Bump thread cache to 1GB to reduce contention for TCMalloc central
+  // list's spinlock.
+  if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
+    FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
   }
 
 #if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
@@ -396,10 +386,8 @@ Status ExecEnv::StartStatestoreSubscriberService() {
 }
 
 Status ExecEnv::StartKrpcService() {
-  if (FLAGS_use_krpc) {
-    LOG(INFO) << "Starting KRPC service";
-    RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_));
-  }
+  LOG(INFO) << "Starting KRPC service";
+  RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_));
   return Status::OK();
 }
 
@@ -440,14 +428,4 @@ Status ExecEnv::GetKuduClient(
   return Status::OK();
 }
 
-DataStreamMgr* ExecEnv::ThriftStreamMgr() {
-  DCHECK(!FLAGS_use_krpc);
-  return dynamic_cast<DataStreamMgr*>(stream_mgr_.get());
-}
-
-KrpcDataStreamMgr* ExecEnv::KrpcStreamMgr() {
-  DCHECK(FLAGS_use_krpc);
-  return dynamic_cast<KrpcDataStreamMgr*>(stream_mgr_.get());
-}
-
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 54a042a..3832d0d 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -41,8 +41,6 @@ namespace impala {
 class AdmissionController;
 class BufferPool;
 class CallableThreadPool;
-class DataStreamMgrBase;
-class DataStreamMgr;
 class DataStreamService;
 class QueryExecMgr;
 class Frontend;
@@ -109,13 +107,7 @@ class ExecEnv {
   /// StartServices() was successful.
   TNetworkAddress GetThriftBackendAddress() const;
 
-  DataStreamMgrBase* stream_mgr() { return stream_mgr_.get(); }
-
-  /// TODO: Remove once a single DataStreamMgrBase implementation is standardized on.
-  /// Clients of DataStreamMgrBase should use stream_mgr() unless they need to access
-  /// members that are not a part of the DataStreamMgrBase interface.
-  DataStreamMgr* ThriftStreamMgr();
-  KrpcDataStreamMgr* KrpcStreamMgr();
+  KrpcDataStreamMgr* stream_mgr() { return stream_mgr_.get(); }
 
   ImpalaBackendClientCache* impalad_client_cache() {
     return impalad_client_cache_.get();
@@ -175,7 +167,7 @@ class ExecEnv {
  private:
   boost::scoped_ptr<ObjectPool> obj_pool_;
   boost::scoped_ptr<MetricGroup> metrics_;
-  boost::scoped_ptr<DataStreamMgrBase> stream_mgr_;
+  boost::scoped_ptr<KrpcDataStreamMgr> stream_mgr_;
   boost::scoped_ptr<Scheduler> scheduler_;
   boost::scoped_ptr<AdmissionController> admission_controller_;
   boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index af03a2b..11122c7 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -34,14 +34,14 @@
 #include "exec/scan-node.h"
 #include "runtime/exec-env.h"
 #include "runtime/backend-client.h"
-#include "runtime/runtime-filter-bank.h"
 #include "runtime/client-cache.h"
-#include "runtime/data-stream-mgr.h"
-#include "runtime/runtime-state.h"
+#include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/query-state.h"
 #include "runtime/query-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/runtime-filter-bank.h"
+#include "runtime/runtime-state.h"
 #include "runtime/thread-resource-mgr.h"
 #include "scheduling/query-schedule.h"
 #include "util/debug-util.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index bc490dd..3b11c07 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -52,8 +52,8 @@
 /// TODO: We don't need millisecond precision here.
 const int32_t STREAM_EXPIRATION_TIME_MS = 300 * 1000;
 
-DECLARE_bool(use_krpc);
-DECLARE_int32(datastream_sender_timeout_ms);
+DEFINE_int32(datastream_sender_timeout_ms, 120000, "(Advanced) The time, in ms, that can "
+    "elapse  before a plan fragment will time-out trying to send the initial row batch.");
 DEFINE_int32(datastream_service_num_deserialization_threads, 16,
     "Number of threads for deserializing RPC requests deferred due to the receiver "
     "not ready or the soft limit of the receiver is reached.");
@@ -98,7 +98,7 @@ inline uint32_t KrpcDataStreamMgr::GetHashValue(
   return value;
 }
 
-shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
+shared_ptr<KrpcDataStreamRecvr> KrpcDataStreamMgr::CreateRecvr(
     const RowDescriptor* row_desc, const TUniqueId& finst_id, PlanNodeId dest_node_id,
     int num_senders, int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
     MemTracker* parent_tracker, BufferPool::ClientHandle* client) {

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index 3cd2191..5d7bc56 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -18,8 +18,6 @@
 #ifndef IMPALA_RUNTIME_KRPC_DATA_STREAM_MGR_H
 #define IMPALA_RUNTIME_KRPC_DATA_STREAM_MGR_H
 
-#include "runtime/data-stream-mgr-base.h"
-
 #include <list>
 #include <queue>
 #include <set>
@@ -30,7 +28,6 @@
 
 #include "common/status.h"
 #include "common/object-pool.h"
-#include "runtime/data-stream-mgr-base.h"
 #include "runtime/descriptors.h"  // for PlanNodeId
 #include "runtime/row-batch.h"
 #include "util/metrics.h"
@@ -225,7 +222,7 @@ struct EndDataStreamCtx {
 ///  time.
 ///  'total-senders-timedout-waiting-for-recvr-creation' - total number of senders that
 ///  timed-out while waiting for a receiver.
-class KrpcDataStreamMgr : public DataStreamMgrBase {
+class KrpcDataStreamMgr : public CacheLineAligned {
  public:
   KrpcDataStreamMgr(MetricGroup* metrics);
 
@@ -243,10 +240,10 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
   /// Ownership of the receiver is shared between this DataStream mgr instance and the
   /// caller. 'client' is the BufferPool's client handle for allocating buffers.
   /// It's owned by the parent exchange node.
-  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
+  std::shared_ptr<KrpcDataStreamRecvr> CreateRecvr(const RowDescriptor* row_desc,
       const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
       int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
-      MemTracker* parent_tracker, BufferPool::ClientHandle* client) override;
+      MemTracker* parent_tracker, BufferPool::ClientHandle* client);
 
   /// Handler for TransmitData() RPC.
   ///
@@ -286,7 +283,7 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
   /// Cancels all receivers registered for fragment_instance_id immediately. The
   /// receivers will not accept any row batches after being cancelled. Any buffered
   /// row batches will not be freed until Close() is called on the receivers.
-  void Cancel(const TUniqueId& fragment_instance_id) override;
+  void Cancel(const TUniqueId& fragment_instance_id);
 
   /// Waits for maintenance thread and sender response thread pool to finish.
   ~KrpcDataStreamMgr();

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 3933e02..96cc25f 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -43,7 +43,6 @@
 
 #include "common/names.h"
 
-DECLARE_bool(use_krpc);
 DECLARE_int32(datastream_service_num_deserialization_threads);
 
 using kudu::MonoDelta;

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h
index 18454d7..e19f686 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -18,8 +18,6 @@
 #ifndef IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H
 #define IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H
 
-#include "data-stream-recvr-base.h"
-
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 
@@ -84,7 +82,7 @@ class TransmitDataResponsePB;
 /// - no new row batch or deferred RPCs should be added to a cancelled sender queue
 /// - Cancel() will drain the deferred RPCs queue and the row batch queue
 ///
-class KrpcDataStreamRecvr : public DataStreamRecvrBase {
+class KrpcDataStreamRecvr {
  public:
   ~KrpcDataStreamRecvr();
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 6928ebc..5cc80ce 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -34,10 +34,10 @@
 #include "exprs/timezone_db.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
-#include "runtime/data-stream-mgr-base.h"
-#include "runtime/data-stream-recvr.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec-env.h"
+#include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/krpc-data-stream-recvr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
 #include "runtime/runtime-filter-bank.h"
@@ -302,7 +302,7 @@ io::DiskIoMgr* RuntimeState::io_mgr() {
   return exec_env_->disk_io_mgr();
 }
 
-DataStreamMgrBase* RuntimeState::stream_mgr() {
+KrpcDataStreamMgr* RuntimeState::stream_mgr() {
   return exec_env_->stream_mgr();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 6d38fc6..78a9864 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -36,6 +36,7 @@ class BufferPool;
 class DataStreamRecvr;
 class DescriptorTbl;
 class Expr;
+class KrpcDataStreamMgr;
 class LlvmCodeGen;
 class MemTracker;
 class ObjectPool;
@@ -47,7 +48,6 @@ class TimestampValue;
 class ThreadResourcePool;
 class TUniqueId;
 class ExecEnv;
-class DataStreamMgrBase;
 class HBaseTableFactory;
 class TPlanFragmentCtx;
 class TPlanFragmentInstanceCtx;
@@ -107,7 +107,7 @@ class RuntimeState {
         : no_instance_id_;
   }
   ExecEnv* exec_env() { return exec_env_; }
-  DataStreamMgrBase* stream_mgr();
+  KrpcDataStreamMgr* stream_mgr();
   HBaseTableFactory* htable_factory();
   ImpalaBackendClientCache* impalad_client_cache();
   CatalogServiceClientCache* catalogd_client_cache();

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 2baffbb..20acc43 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -46,8 +46,6 @@ using namespace apache::thrift;
 using namespace org::apache::impala::fb;
 using namespace strings;
 
-DECLARE_bool(use_krpc);
-
 namespace impala {
 
 static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
@@ -75,12 +73,10 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
   // requests.
   local_backend_descriptor_.ip_address = ip;
   LOG(INFO) << "Scheduler using " << ip << " as IP address";
-  if (FLAGS_use_krpc) {
-    // KRPC relies on resolved IP address.
-    DCHECK(IsResolvedAddress(krpc_address));
-    DCHECK_EQ(krpc_address.hostname, ip);
-    local_backend_descriptor_.__set_krpc_address(krpc_address);
-  }
+  // KRPC relies on resolved IP address.
+  DCHECK(IsResolvedAddress(krpc_address));
+  DCHECK_EQ(krpc_address.hostname, ip);
+  local_backend_descriptor_.__set_krpc_address(krpc_address);
 
   coord_only_backend_config_.AddBackend(local_backend_descriptor_);
 
@@ -346,12 +342,10 @@ void Scheduler::ComputeFragmentExecParams(
         dest.__set_fragment_instance_id(dest_params->instance_exec_params[i].instance_id);
         const TNetworkAddress& host = dest_params->instance_exec_params[i].host;
         dest.__set_thrift_backend(host);
-        if (FLAGS_use_krpc) {
-          const TBackendDescriptor& desc = LookUpBackendDesc(executor_config, host);
-          DCHECK(desc.__isset.krpc_address);
-          DCHECK(IsResolvedAddress(desc.krpc_address));
-          dest.__set_krpc_backend(desc.krpc_address);
-        }
+        const TBackendDescriptor& desc = LookUpBackendDesc(executor_config, host);
+        DCHECK(desc.__isset.krpc_address);
+        DCHECK(IsResolvedAddress(desc.krpc_address));
+        dest.__set_krpc_backend(desc.krpc_address);
       }
 
       // enumerate senders consecutively;

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 1d42a99..b7892ff 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -74,14 +74,14 @@ Status DataStreamService::Init() {
 void DataStreamService::EndDataStream(const EndDataStreamRequestPB* request,
     EndDataStreamResponsePB* response, RpcContext* rpc_context) {
   // CloseSender() is guaranteed to eventually respond to this RPC so we don't do it here.
-  ExecEnv::GetInstance()->KrpcStreamMgr()->CloseSender(request, response, rpc_context);
+  ExecEnv::GetInstance()->stream_mgr()->CloseSender(request, response, rpc_context);
 }
 
 void DataStreamService::TransmitData(const TransmitDataRequestPB* request,
     TransmitDataResponsePB* response, RpcContext* rpc_context) {
   FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA);
   // AddData() is guaranteed to eventually respond to this RPC so we don't do it here.
-  ExecEnv::GetInstance()->KrpcStreamMgr()->AddData(request, response, rpc_context);
+  ExecEnv::GetInstance()->stream_mgr()->AddData(request, response, rpc_context);
 }
 
 template<typename ResponsePBType>

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/impala-internal-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index 53a62da..c479a7f 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -80,15 +80,6 @@ void ImpalaInternalService::ReportExecStatus(TReportExecStatusResult& return_val
   impala_server_->ReportExecStatus(return_val, params);
 }
 
-void ImpalaInternalService::TransmitData(TTransmitDataResult& return_val,
-    const TTransmitDataParams& params) {
-  FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA);
-  DCHECK(params.__isset.dest_fragment_instance_id);
-  DCHECK(params.__isset.sender_id);
-  DCHECK(params.__isset.dest_node_id);
-  impala_server_->TransmitData(return_val, params);
-}
-
 void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
     const TUpdateFilterParams& params) {
   FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER);

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/impala-internal-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index 3285d9d..8d5ddd5 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -37,8 +37,6 @@ class ImpalaInternalService : public ImpalaInternalServiceIf {
       const TCancelQueryFInstancesParams& params);
   virtual void ReportExecStatus(TReportExecStatusResult& return_val,
       const TReportExecStatusParams& params);
-  virtual void TransmitData(TTransmitDataResult& return_val,
-      const TTransmitDataParams& params);
   virtual void UpdateFilter(TUpdateFilterResult& return_val,
       const TUpdateFilterParams& params);
   virtual void PublishFilter(TPublishFilterResult& return_val,

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 97fa70e..077f634 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -52,7 +52,6 @@
 #include "rpc/thrift-util.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
-#include "runtime/data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/lib-cache.h"
 #include "runtime/mem-tracker.h"
@@ -119,7 +118,6 @@ DECLARE_string(authorized_proxy_group_config);
 DECLARE_string(authorized_proxy_group_config_delimiter);
 DECLARE_bool(abort_on_config_error);
 DECLARE_bool(disk_spill_encryption);
-DECLARE_bool(use_krpc);
 DECLARE_bool(use_local_catalog);
 
 DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served."
@@ -1268,33 +1266,6 @@ void ImpalaServer::ReportExecStatus(
   request_state->UpdateBackendExecStatus(params).SetTStatus(&return_val);
 }
 
-void ImpalaServer::TransmitData(
-    TTransmitDataResult& return_val, const TTransmitDataParams& params) {
-  VLOG_ROW << "TransmitData(): instance_id=" << PrintId(params.dest_fragment_instance_id)
-           << " node_id=" << params.dest_node_id
-           << " #rows=" << params.row_batch.num_rows
-           << " sender_id=" << params.sender_id
-           << " eos=" << (params.eos ? "true" : "false");
-  // TODO: fix Thrift so we can simply take ownership of thrift_batch instead
-  // of having to copy its data
-  if (params.row_batch.num_rows > 0) {
-    Status status = exec_env_->ThriftStreamMgr()->AddData(
-        params.dest_fragment_instance_id, params.dest_node_id, params.row_batch,
-        params.sender_id);
-    status.SetTStatus(&return_val);
-    if (!status.ok()) {
-      // should we close the channel here as well?
-      return;
-    }
-  }
-
-  if (params.eos) {
-    exec_env_->ThriftStreamMgr()->CloseSender(
-        params.dest_fragment_instance_id, params.dest_node_id,
-        params.sender_id).SetTStatus(&return_val);
-  }
-}
-
 void ImpalaServer::InitializeConfigVariables() {
   // Set idle_session_timeout here to let the SET command return the value of
   // the command line option FLAGS_idle_session_timeout
@@ -1755,11 +1726,10 @@ void ImpalaServer::AddLocalBackendToStatestore(
   local_backend_descriptor.ip_address = exec_env_->ip_address();
   local_backend_descriptor.__set_proc_mem_limit(
       exec_env_->process_mem_tracker()->limit());
-  if (FLAGS_use_krpc) {
-    const TNetworkAddress& krpc_address = exec_env_->krpc_address();
-    DCHECK(IsResolvedAddress(krpc_address));
-    local_backend_descriptor.__set_krpc_address(krpc_address);
-  }
+  const TNetworkAddress& krpc_address = exec_env_->krpc_address();
+  DCHECK(IsResolvedAddress(krpc_address));
+  local_backend_descriptor.__set_krpc_address(krpc_address);
+
   subscriber_topic_updates->emplace_back(TTopicDelta());
   TTopicDelta& update = subscriber_topic_updates->back();
   update.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index a31734b..8b7af09 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -59,8 +59,6 @@ class TPlanExecParams;
 class TInsertResult;
 class TReportExecStatusArgs;
 class TReportExecStatusResult;
-class TTransmitDataArgs;
-class TTransmitDataResult;
 class TNetworkAddress;
 class TClientRequest;
 class TExecRequest;
@@ -273,8 +271,6 @@ class ImpalaServer : public ImpalaServiceIf,
   /// ImpalaInternalService rpcs
   void ReportExecStatus(TReportExecStatusResult& return_val,
       const TReportExecStatusParams& params);
-  void TransmitData(TTransmitDataResult& return_val,
-      const TTransmitDataParams& params);
   void UpdateFilter(TUpdateFilterResult& return_val,
       const TUpdateFilterParams& params);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/bin/run-all-tests.sh
----------------------------------------------------------------------
diff --git a/bin/run-all-tests.sh b/bin/run-all-tests.sh
index 83b3548..5f6831e 100755
--- a/bin/run-all-tests.sh
+++ b/bin/run-all-tests.sh
@@ -39,8 +39,6 @@ if "${CLUSTER_DIR}/admin" is_kerberized; then
 fi
 
 # Parametrized Test Options
-# Disable KRPC for test cluster and test execution
-: ${DISABLE_KRPC:=false}
 # Run FE Tests
 : ${FE_TEST:=true}
 # Run Backend Tests
@@ -77,11 +75,6 @@ if [[ "${ERASURE_CODING}" = true ]]; then
     --impalad_args=--default_query_options=allow_erasure_coded_files=true"
 fi
 
-# If KRPC tests are disabled, pass the flag to disable KRPC during cluster start.
-if [[ "${DISABLE_KRPC}" == "true" ]]; then
-  TEST_START_CLUSTER_ARGS="${TEST_START_CLUSTER_ARGS} --disable_krpc"
-fi
-
 # Indicates whether code coverage reports should be generated.
 : ${CODE_COVERAGE:=false}
 
@@ -130,12 +123,6 @@ if [[ "${TARGET_FILESYSTEM}" == "local" ]]; then
   COMMON_PYTEST_ARGS+=" --impalad=localhost:21000"
 fi
 
-# If KRPC tests are disabled, pass test_no_krpc flag to pytest.
-# This includes the end-to-end tests and the custom cluster tests.
-if [[ "${DISABLE_KRPC}" == "true" ]]; then
-  COMMON_PYTEST_ARGS+=" --test_no_krpc"
-fi
-
 # For logging when using run-step.
 LOG_DIR="${IMPALA_EE_TEST_LOGS_DIR}"
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 4d34d45..208cc35 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -60,8 +60,6 @@ parser.add_option("--state_store_args", dest="state_store_args", action="append"
 parser.add_option("--catalogd_args", dest="catalogd_args", action="append",
                   type="string", default=[],
                   help="Additional arguments to pass to the Catalog Service at startup")
-parser.add_option("--disable_krpc", dest="disable_krpc", action="store_true",
-                  default=False, help="Disable KRPC DataStream service during startup.")
 parser.add_option("--kill", "--kill_only", dest="kill_only", action="store_true",
                   default=False, help="Instead of starting the cluster, just kill all"
                   " the running impalads and the statestored.")
@@ -330,9 +328,6 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi
           delay=delay_list[i],
           args=args)
 
-    if options.disable_krpc:
-      args = "-use_krpc=false {args}".format(args=args)
-
     # Appended at the end so they can override previous args.
     if i < len(per_impalad_args):
       args = "{args} {per_impalad_args}".format(

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 4797eba..bf48aaa 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -704,34 +704,6 @@ struct TCancelQueryFInstancesResult {
   1: optional Status.TStatus status
 }
 
-
-// TransmitData
-
-struct TTransmitDataParams {
-  1: required ImpalaInternalServiceVersion protocol_version
-
-  // required in V1
-  2: optional Types.TUniqueId dest_fragment_instance_id
-
-  // Id of this fragment in its role as a sender.
-  3: optional i32 sender_id
-
-  // required in V1
-  4: optional Types.TPlanNodeId dest_node_id
-
-  // optional in V1
-  5: optional Results.TRowBatch row_batch
-
-  // if set to true, indicates that no more row batches will be sent
-  // for this dest_node_id
-  6: optional bool eos
-}
-
-struct TTransmitDataResult {
-  // required in V1
-  1: optional Status.TStatus status
-}
-
 // Parameters for RequestPoolService.resolveRequestPool()
 // TODO: why is this here?
 struct TResolveRequestPoolParams {
@@ -881,10 +853,6 @@ service ImpalaInternalService {
   TCancelQueryFInstancesResult CancelQueryFInstances(
       1:TCancelQueryFInstancesParams params);
 
-  // Called by sender to transmit single row batch. Returns error indication
-  // if params.fragmentId or params.destNodeId are unknown or if data couldn't be read.
-  TTransmitDataResult TransmitData(1:TTransmitDataParams params);
-
   // Called by fragment instances that produce local runtime filters to deliver them to
   // the coordinator for aggregation and broadcast.
   TUpdateFilterResult UpdateFilter(1:TUpdateFilterParams params);

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index a5c285d..5947ce9 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -145,9 +145,6 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if use_exclusive_coordinators:
       cmd.append("--use_exclusive_coordinators")
 
-    if pytest.config.option.test_no_krpc:
-      cmd.append("--disable_krpc")
-
     if os.environ.get("ERASURE_CODING") == "true":
       cmd.append("--impalad_args=--default_query_options=allow_erasure_coded_files=true")
 

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/common/skip.py
----------------------------------------------------------------------
diff --git a/tests/common/skip.py b/tests/common/skip.py
index e84c75b..d23b260 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -89,10 +89,6 @@ class SkipIf:
   not_ec = pytest.mark.skipif(not IS_EC, reason="Erasure Coding needed")
   no_secondary_fs = pytest.mark.skipif(not SECONDARY_FILESYSTEM,
       reason="Secondary filesystem needed")
-  not_krpc = pytest.mark.skipif(pytest.config.option.test_no_krpc,
-      reason="Test is only supported when using KRPC.")
-  not_thrift = pytest.mark.skipif(not pytest.config.option.test_no_krpc,
-      reason="Test is only supported when using Thrift RPC.")
 
 class SkipIfIsilon:
   caching = pytest.mark.skipif(IS_ISILON, reason="SET CACHED not implemented for Isilon")

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/common/test_skip.py
----------------------------------------------------------------------
diff --git a/tests/common/test_skip.py b/tests/common/test_skip.py
deleted file mode 100644
index 3e7d281..0000000
--- a/tests/common/test_skip.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pytest
-
-from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIf
-
-class TestSkipIf(ImpalaTestSuite):
-  """
-  This suite tests the effectiveness of various SkipIf decorators.
-  TODO: Remove this once we have tests that make use of these decorators.
-  """
-
-  @classmethod
-  def get_workload(cls):
-    return 'functional-query'
-
-  @SkipIf.not_krpc
-  def test_skip_if_not_krpc(self):
-    assert not pytest.config.option.test_no_krpc
-
-  @SkipIf.not_thrift
-  def test_skip_if_not_thrift(self):
-    assert pytest.config.option.test_no_krpc

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/conftest.py
----------------------------------------------------------------------
diff --git a/tests/conftest.py b/tests/conftest.py
index a1f1859..f01ecb2 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -116,10 +116,6 @@ def pytest_addoption(parser):
                    help=("Indicates that tests are being run against a remote cluster. "
                          "Some tests may be marked to skip or xfail on remote clusters."))
 
-  parser.addoption("--test_no_krpc", dest="test_no_krpc", action="store_true",
-                   default=False, help="Run all tests with KRPC disabled. This assumes "
-                   "that the test cluster has been started with --disable_krpc.")
-
   parser.addoption("--shard_tests", default=None,
                    help="If set to N/M (e.g., 3/5), will split the tests into "
                    "M partitions and run the Nth partition. 1-indexed.")

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/custom_cluster/test_krpc_mem_usage.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_mem_usage.py b/tests/custom_cluster/test_krpc_mem_usage.py
index d358baa..6b0fe6a 100644
--- a/tests/custom_cluster/test_krpc_mem_usage.py
+++ b/tests/custom_cluster/test_krpc_mem_usage.py
@@ -26,7 +26,6 @@ DATA_STREAM_MGR_METRIC = "Data Stream Manager Early RPCs"
 DATA_STREAM_SVC_METRIC = "Data Stream Service Queue"
 ALL_METRICS = [ DATA_STREAM_MGR_METRIC, DATA_STREAM_SVC_METRIC ]
 
-@SkipIf.not_krpc
 class TestKrpcMemUsage(CustomClusterTestSuite):
   """Test for memory usage tracking when using KRPC."""
   TEST_QUERY = "select count(c2.string_col) from \

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/custom_cluster/test_krpc_metrics.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_metrics.py b/tests/custom_cluster/test_krpc_metrics.py
index de16ccc..e728237 100644
--- a/tests/custom_cluster/test_krpc_metrics.py
+++ b/tests/custom_cluster/test_krpc_metrics.py
@@ -24,7 +24,6 @@ from tests.common.impala_cluster import ImpalaCluster
 from tests.common.skip import SkipIf, SkipIfBuildType
 from tests.verifiers.mem_usage_verifier import MemUsageVerifier
 
-@SkipIf.not_krpc
 class TestKrpcMetrics(CustomClusterTestSuite):
   """Test for KRPC metrics that require special arguments during cluster startup."""
   RPCZ_URL = 'http://localhost:25000/rpcz?json'

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/custom_cluster/test_rpc_exception.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_rpc_exception.py b/tests/custom_cluster/test_rpc_exception.py
index 2784e88..fc60c1a 100644
--- a/tests/custom_cluster/test_rpc_exception.py
+++ b/tests/custom_cluster/test_rpc_exception.py
@@ -68,12 +68,6 @@ class TestRPCException(CustomClusterTestSuite):
   def test_rpc_send_timed_out(self, vector):
     self.execute_test_query(None)
 
-  @SkipIf.not_thrift
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=4")
-  def test_rpc_recv_closed_connection(self, vector):
-    self.execute_test_query("Called read on non-open socket")
-
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=5")
   def test_rpc_recv_timed_out(self, vector):
@@ -94,12 +88,6 @@ class TestRPCException(CustomClusterTestSuite):
   def test_rpc_secure_send_timed_out(self, vector):
     self.execute_test_query(None)
 
-  @SkipIf.not_thrift
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=9")
-  def test_rpc_secure_recv_closed_connection(self, vector):
-    self.execute_test_query("TTransportException: Transport not open")
-
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=10")
   def test_rpc_secure_recv_timed_out(self, vector):

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/query_test/test_codegen.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_codegen.py b/tests/query_test/test_codegen.py
index 5bc0cc6..7436c13 100644
--- a/tests/query_test/test_codegen.py
+++ b/tests/query_test/test_codegen.py
@@ -52,7 +52,6 @@ class TestCodegen(ImpalaTestSuite):
     assert len(exec_options) > 0
     assert_codegen_enabled(result.runtime_profile, [1])
 
-  @SkipIf.not_krpc
   def test_datastream_sender_codegen(self, vector):
     """Test the KrpcDataStreamSender's codegen logic"""
     self.run_test_case('QueryTest/datastream-sender-codegen', vector)

http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/webserver/test_web_pages.py
----------------------------------------------------------------------
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 60deca4..000e386 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -249,7 +249,6 @@ class TestWebPage(ImpalaTestSuite):
       assert any(pattern in t for t in thread_names), \
            "Could not find thread matching '%s'" % pattern
 
-  @SkipIf.not_krpc
   def test_krpc_rpcz(self):
     """Test that KRPC metrics are exposed in /rpcz and that they are updated when
     executing a query."""