You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2018/07/24 18:01:44 UTC
[3/5] impala git commit: IMPALA-7212: Removes --use_krpc flag and
remove old DataStream services
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 9f866ba..c1f9cc6 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -30,14 +30,10 @@
#include "rpc/rpc-mgr.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
-#include "runtime/data-stream-mgr-base.h"
-#include "runtime/data-stream-mgr.h"
#include "runtime/exec-env.h"
#include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/krpc-data-stream-recvr.h"
#include "runtime/krpc-data-stream-sender.h"
-#include "runtime/data-stream-sender.h"
-#include "runtime/data-stream-recvr-base.h"
-#include "runtime/data-stream-recvr.h"
#include "runtime/descriptors.h"
#include "runtime/client-cache.h"
#include "runtime/backend-client.h"
@@ -55,8 +51,6 @@
#include "util/test-info.h"
#include "util/tuple-row-compare.h"
#include "gen-cpp/data_stream_service.pb.h"
-#include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
#include "gen-cpp/Types_types.h"
#include "gen-cpp/Descriptors_types.h"
#include "service/fe-support.h"
@@ -82,8 +76,6 @@ DECLARE_int32(datastream_service_num_deserialization_threads);
DECLARE_int32(datastream_service_deserialization_queue_size);
DECLARE_string(datastream_service_queue_mem_limit);
-DECLARE_bool(use_krpc);
-
static const PlanNodeId DEST_NODE_ID = 1;
static const int BATCH_CAPACITY = 100; // rows
static const int PER_ROW_DATA = 8;
@@ -93,39 +85,6 @@ static const int SHORT_SERVICE_QUEUE_MEM_LIMIT = 16;
namespace impala {
-// This class acts as a service interface for all Thrift related communication within
-// this test file.
-class ImpalaThriftTestBackend : public ImpalaInternalServiceIf {
- public:
- ImpalaThriftTestBackend(DataStreamMgr* stream_mgr): mgr_(stream_mgr) {}
- virtual ~ImpalaThriftTestBackend() {}
-
- virtual void ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
- const TExecQueryFInstancesParams& params) {}
- virtual void CancelQueryFInstances(TCancelQueryFInstancesResult& return_val,
- const TCancelQueryFInstancesParams& params) {}
- virtual void ReportExecStatus(TReportExecStatusResult& return_val,
- const TReportExecStatusParams& params) {}
- virtual void UpdateFilter(TUpdateFilterResult& return_val,
- const TUpdateFilterParams& params) {}
- virtual void PublishFilter(TPublishFilterResult& return_val,
- const TPublishFilterParams& params) {}
-
- virtual void TransmitData(
- TTransmitDataResult& return_val, const TTransmitDataParams& params) {
- if (!params.eos) {
- mgr_->AddData(params.dest_fragment_instance_id, params.dest_node_id,
- params.row_batch, params.sender_id).SetTStatus(&return_val);
- } else {
- mgr_->CloseSender(params.dest_fragment_instance_id, params.dest_node_id,
- params.sender_id).SetTStatus(&return_val);
- }
- }
-
- private:
- DataStreamMgr* mgr_;
-};
-
// This class acts as a service interface for all KRPC related communication within
// this test file.
class ImpalaKRPCTestBackend : public DataStreamServiceIf {
@@ -166,18 +125,7 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
unique_ptr<MemTracker> mem_tracker_;
};
-template <class T> class DataStreamTestBase : public T {
- protected:
- virtual void SetUp() {}
- virtual void TearDown() {}
-};
-
-enum KrpcSwitch {
- USE_THRIFT,
- USE_KRPC
-};
-
-class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwitch>> {
+class DataStreamTest : public testing::Test {
protected:
DataStreamTest() : next_val_(0) {
// Stop tests that rely on mismatched sender / receiver pairs timing out from failing.
@@ -186,9 +134,6 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
~DataStreamTest() { runtime_state_->ReleaseResources(); }
virtual void SetUp() {
- // Initialize MemTrackers and RuntimeState for use by the data stream receiver.
- FLAGS_use_krpc = GetParam() == USE_KRPC;
-
exec_env_.reset(new ExecEnv());
ABORT_IF_ERROR(exec_env_->InitForFeTests());
exec_env_->InitBufferPool(32 * 1024, 1024 * 1024 * 1024, 32 * 1024);
@@ -234,14 +179,10 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
hash_sink_.output_partition.__isset.partition_exprs = true;
hash_sink_.output_partition.partition_exprs.push_back(expr);
- if (GetParam() == USE_THRIFT) {
- StartThriftBackend();
- } else {
- IpAddr ip;
- ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
- krpc_address_ = MakeNetworkAddress(ip, FLAGS_port);
- StartKrpcBackend();
- }
+ IpAddr ip;
+ ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+ krpc_address_ = MakeNetworkAddress(ip, FLAGS_port);
+ StartKrpcBackend();
}
const TDataSink GetSink(TPartitionType::type partition_type) {
@@ -267,12 +208,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
less_than_->Close(runtime_state_.get());
ScalarExpr::Close(ordering_exprs_);
mem_pool_->FreeAll();
- if (GetParam() == USE_THRIFT) {
- exec_env_->impalad_client_cache()->TestShutdown();
- StopThriftBackend();
- } else {
- StopKrpcBackend();
- }
+ StopKrpcBackend();
exec_env_->buffer_pool()->DeregisterClient(&buffer_pool_client_);
}
@@ -312,8 +248,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
unique_ptr<ImpalaKRPCTestBackend> test_service_;
// receiving node
- DataStreamMgrBase* stream_mgr_ = nullptr;
- ThriftServer* server_ = nullptr;
+ KrpcDataStreamMgr* stream_mgr_ = nullptr;
// sending node(s)
TDataStreamSink broadcast_sink_;
@@ -335,7 +270,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
int receiver_num;
unique_ptr<thread> thread_handle;
- shared_ptr<DataStreamRecvrBase> stream_recvr;
+ shared_ptr<KrpcDataStreamRecvr> stream_recvr;
Status status;
int num_rows_received = 0;
multiset<int64_t> data_values;
@@ -360,9 +295,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
dest.fragment_instance_id = next_instance_id_;
dest.thrift_backend.hostname = "localhost";
dest.thrift_backend.port = FLAGS_port;
- if (GetParam() == USE_KRPC) {
- dest.__set_krpc_backend(krpc_address_);
- }
+ dest.__set_krpc_backend(krpc_address_);
*instance_id = next_instance_id_;
++next_instance_id_.lo;
}
@@ -524,7 +457,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
// hash-partitioned streams send values to the right partition
int64_t value = *j;
uint64_t hash_val = RawValue::GetHashValueFastHash(&value, TYPE_BIGINT,
- DataStreamSender::EXCHANGE_HASH_SEED);
+ KrpcDataStreamSender::EXCHANGE_HASH_SEED);
EXPECT_EQ(hash_val % receiver_info_.size(), info->receiver_num);
}
}
@@ -550,21 +483,9 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
}
}
- // Start Thrift based backend in separate thread.
- void StartThriftBackend() {
- // Dynamic cast stream_mgr_ which is of type DataStreamMgrBase to derived type
- // DataStreamMgr, since ImpalaThriftTestBackend() accepts only DataStreamMgr*.
- boost::shared_ptr<ImpalaThriftTestBackend> handler(
- new ImpalaThriftTestBackend(exec_env_->ThriftStreamMgr()));
- boost::shared_ptr<TProcessor> processor(new ImpalaInternalServiceProcessor(handler));
- ThriftServerBuilder builder("DataStreamTest backend", processor, FLAGS_port);
- ASSERT_OK(builder.Build(&server_));
- ASSERT_OK(server_->Start());
- }
-
void StartKrpcBackend() {
RpcMgr* rpc_mgr = exec_env_->rpc_mgr();
- KrpcDataStreamMgr* krpc_stream_mgr = exec_env_->KrpcStreamMgr();
+ KrpcDataStreamMgr* krpc_stream_mgr = exec_env_->stream_mgr();
ASSERT_OK(rpc_mgr->Init());
test_service_.reset(new ImpalaKRPCTestBackend(rpc_mgr, krpc_stream_mgr,
exec_env_->process_mem_tracker()));
@@ -573,12 +494,6 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
ASSERT_OK(rpc_mgr->StartServices(krpc_address_));
}
- void StopThriftBackend() {
- VLOG_QUERY << "stop backend\n";
- server_->StopForTesting();
- delete server_;
- }
-
void StopKrpcBackend() {
exec_env_->rpc_mgr()->Shutdown();
}
@@ -590,7 +505,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
sender_info_.emplace_back(make_unique<SenderInfo>());
sender_info_.back()->thread_handle.reset(
new thread(&DataStreamTest::Sender, this, num_senders, channel_buffer_size,
- partition_type, GetParam() == USE_THRIFT));
+ partition_type));
}
void JoinSenders() {
@@ -600,8 +515,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
}
}
- void Sender(int sender_num,
- int channel_buffer_size, TPartitionType::type partition_type, bool is_thrift) {
+ void Sender(
+ int sender_num, int channel_buffer_size, TPartitionType::type partition_type) {
RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
VLOG_QUERY << "create sender " << sender_num;
const TDataSink& sink = GetSink(partition_type);
@@ -615,18 +530,10 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
TExpr output_exprs;
output_exprs.nodes.push_back(expr_node);
- if (is_thrift) {
- sender.reset(new DataStreamSender(
- sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
- EXPECT_OK(static_cast<DataStreamSender*>(
- sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
- } else {
- sender.reset(new KrpcDataStreamSender(
- sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
- EXPECT_OK(static_cast<KrpcDataStreamSender*>(
- sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
- }
-
+ sender.reset(new KrpcDataStreamSender(
+ sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size, &state));
+ EXPECT_OK(static_cast<KrpcDataStreamSender*>(
+ sender.get())->Init(vector<TExpr>({output_exprs}), sink, &state));
EXPECT_OK(sender->Prepare(&state, &tracker_));
EXPECT_OK(sender->Open(&state));
scoped_ptr<RowBatch> batch(CreateRowBatch());
@@ -641,13 +548,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
VLOG_QUERY << "closing sender" << sender_num;
info->status.MergeStatus(sender->FlushFinal(&state));
sender->Close(&state);
- if (is_thrift) {
- info->num_bytes_sent = static_cast<DataStreamSender*>(
- sender.get())->GetNumDataBytesSent();
- } else {
- info->num_bytes_sent = static_cast<KrpcDataStreamSender*>(
- sender.get())->GetNumDataBytesSent();
- }
+ info->num_bytes_sent = static_cast<KrpcDataStreamSender*>(
+ sender.get())->GetNumDataBytesSent();
batch->Reset();
state.ReleaseResources();
@@ -672,18 +574,6 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
}
};
-// A seperate class for tests that are required to be run against Thrift only.
-class DataStreamTestThriftOnly : public DataStreamTest {
- protected:
- virtual void SetUp() {
- DataStreamTest::SetUp();
- }
-
- virtual void TearDown() {
- DataStreamTest::TearDown();
- }
-};
-
// A seperate test class which simulates the behavior in which deserialization queue
// fills up and all deserialization threads are busy.
class DataStreamTestShortDeserQueue : public DataStreamTest {
@@ -715,19 +605,7 @@ class DataStreamTestShortServiceQueue : public DataStreamTest {
}
};
-INSTANTIATE_TEST_CASE_P(ThriftOrKrpc, DataStreamTest,
- ::testing::Values(USE_KRPC, USE_THRIFT));
-
-INSTANTIATE_TEST_CASE_P(ThriftOnly, DataStreamTestThriftOnly,
- ::testing::Values(USE_THRIFT));
-
-INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestShortDeserQueue,
- ::testing::Values(USE_KRPC));
-
-INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestShortServiceQueue,
- ::testing::Values(USE_KRPC));
-
-TEST_P(DataStreamTest, UnknownSenderSmallResult) {
+TEST_F(DataStreamTest, UnknownSenderSmallResult) {
// starting a sender w/o a corresponding receiver results in an error. No bytes should
// be sent.
// case 1: entire query result fits in single buffer
@@ -738,7 +616,7 @@ TEST_P(DataStreamTest, UnknownSenderSmallResult) {
EXPECT_EQ(sender_info_[0]->status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
}
-TEST_P(DataStreamTest, UnknownSenderLargeResult) {
+TEST_F(DataStreamTest, UnknownSenderLargeResult) {
// case 2: query result requires multiple buffers
TUniqueId dummy_id;
GetNextInstanceId(&dummy_id);
@@ -747,7 +625,7 @@ TEST_P(DataStreamTest, UnknownSenderLargeResult) {
EXPECT_EQ(sender_info_[0]->status.code(), TErrorCode::DATASTREAM_SENDER_TIMEOUT);
}
-TEST_P(DataStreamTest, Cancel) {
+TEST_F(DataStreamTest, Cancel) {
TUniqueId instance_id;
StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, false, &instance_id);
stream_mgr_->Cancel(instance_id);
@@ -758,7 +636,7 @@ TEST_P(DataStreamTest, Cancel) {
EXPECT_TRUE(receiver_info_[1]->status.IsCancelled());
}
-TEST_P(DataStreamTest, BasicTest) {
+TEST_F(DataStreamTest, BasicTest) {
// TODO: also test that all client connections have been returned
TPartitionType::type stream_types[] =
{TPartitionType::UNPARTITIONED, TPartitionType::RANDOM,
@@ -781,54 +659,6 @@ TEST_P(DataStreamTest, BasicTest) {
}
}
-// This test checks for the avoidance of IMPALA-2931, which is a crash that would occur if
-// the parent memtracker of a DataStreamRecvr's memtracker was deleted before the
-// DataStreamRecvr was destroyed. The fix was to move decoupling the child tracker from
-// the parent into DataStreamRecvr::Close() which should always be called before the
-// parent is destroyed. In practice the parent is a member of the query's runtime state.
-//
-// TODO: Make lifecycle requirements more explicit.
-TEST_P(DataStreamTestThriftOnly, CloseRecvrWhileReferencesRemain) {
- scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), exec_env_.get()));
- RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
-
- // Start just one receiver.
- TUniqueId instance_id;
- GetNextInstanceId(&instance_id);
- shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(row_desc_,
- instance_id, DEST_NODE_ID, 1, 1, false, profile, &tracker_, nullptr);
-
- // Perform tear down, but keep a reference to the receiver so that it is deleted last
- // (to confirm that the destructor does not access invalid state after tear-down).
- stream_recvr->Close();
-
- // Force deletion of the parent memtracker by destroying it's owning runtime state.
- runtime_state->ReleaseResources();
- runtime_state.reset();
-
- // Send an eos RPC to the receiver. Not required for tear-down, but confirms that the
- // RPC does not cause an error (the receiver will still be called, since it is only
- // Close()'d, not deleted from the data stream manager).
- Status rpc_status;
- ImpalaBackendConnection client(exec_env_->impalad_client_cache(),
- MakeNetworkAddress("localhost", FLAGS_port), &rpc_status);
- EXPECT_OK(rpc_status);
- TTransmitDataParams params;
- params.protocol_version = ImpalaInternalServiceVersion::V1;
- params.__set_eos(true);
- params.__set_dest_fragment_instance_id(instance_id);
- params.__set_dest_node_id(DEST_NODE_ID);
- TUniqueId dummy_id;
- params.__set_sender_id(0);
-
- TTransmitDataResult result;
- rpc_status = client.DoRpc(&ImpalaBackendClient::TransmitData, params, &result);
-
- // Finally, stream_recvr destructor happens here. Before fix for IMPALA-2931, this
- // would have resulted in a crash.
- stream_recvr.reset();
-}
-
// This test is to exercise a previously present deadlock path which is now fixed, to
// ensure that the deadlock does not happen anymore. It does this by doing the following:
// This test starts multiple senders to send to the same receiver. It makes sure that
@@ -841,7 +671,7 @@ TEST_P(DataStreamTestThriftOnly, CloseRecvrWhileReferencesRemain) {
// already being deserialized will be waiting on the KrpcDataStreamMgr::lock_ as well.
// But the first thread will never release the lock since it's stuck on Offer(), causing
// a deadlock. This is fixed with IMPALA-6346.
-TEST_P(DataStreamTestShortDeserQueue, TestNoDeadlock) {
+TEST_F(DataStreamTestShortDeserQueue, TestNoDeadlock) {
TUniqueId instance_id;
GetNextInstanceId(&instance_id);
@@ -874,7 +704,7 @@ TEST_P(DataStreamTestShortDeserQueue, TestNoDeadlock) {
}
// Test that payloads larger than the service queue's soft mem limit can be transmitted.
-TEST_P(DataStreamTestShortServiceQueue, TestLargePayload) {
+TEST_F(DataStreamTestShortServiceQueue, TestLargePayload) {
TestStream(
TPartitionType::UNPARTITIONED, 4, 1, SHORT_SERVICE_QUEUE_MEM_LIMIT * 2, false);
}
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 349b817..319e948 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -35,7 +35,6 @@
#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/client-cache.h"
#include "runtime/coordinator.h"
-#include "runtime/data-stream-mgr.h"
#include "runtime/hbase-table-factory.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/io/disk-io-mgr.h"
@@ -80,8 +79,6 @@ DEFINE_int32(state_store_subscriber_port, 23000,
"port where StatestoreSubscriberService should be exported");
DEFINE_int32(num_hdfs_worker_threads, 16,
"(Advanced) The number of threads in the global HDFS operation pool");
-DEFINE_bool(use_krpc, true, "If true, use KRPC for the DataStream subsystem. "
- "Otherwise use Thrift RPC.");
DEFINE_bool_hidden(use_local_catalog, false,
"Use experimental implementation of a local catalog. If this is set, "
@@ -159,15 +156,10 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
configured_backend_address_(MakeNetworkAddress(FLAGS_hostname, backend_port)) {
- if (FLAGS_use_krpc) {
- VLOG_QUERY << "Using KRPC.";
- // KRPC relies on resolved IP address. It's set in Init().
- krpc_address_.__set_port(krpc_port);
- rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured()));
- stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
- } else {
- stream_mgr_.reset(new DataStreamMgr(metrics_.get()));
- }
+ // KRPC relies on resolved IP address. It's set in Init().
+ krpc_address_.__set_port(krpc_port);
+ rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured()));
+ stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
request_pool_service_.reset(new RequestPoolService(metrics_.get()));
@@ -298,19 +290,17 @@ Status ExecEnv::Init() {
"Buffer Pool: Unused Reservation", mem_tracker_.get()));
// Initializes the RPCMgr and DataStreamServices.
- if (FLAGS_use_krpc) {
- krpc_address_.__set_hostname(ip_address_);
- // Initialization needs to happen in the following order due to dependencies:
- // - RPC manager, DataStreamService and DataStreamManager.
- RETURN_IF_ERROR(rpc_mgr_->Init());
- data_svc_.reset(new DataStreamService(rpc_metrics_));
- RETURN_IF_ERROR(data_svc_->Init());
- RETURN_IF_ERROR(KrpcStreamMgr()->Init(data_svc_->mem_tracker()));
- // Bump thread cache to 1GB to reduce contention for TCMalloc central
- // list's spinlock.
- if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
- FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
- }
+ krpc_address_.__set_hostname(ip_address_);
+ // Initialization needs to happen in the following order due to dependencies:
+ // - RPC manager, DataStreamService and DataStreamManager.
+ RETURN_IF_ERROR(rpc_mgr_->Init());
+ data_svc_.reset(new DataStreamService(rpc_metrics_));
+ RETURN_IF_ERROR(data_svc_->Init());
+ RETURN_IF_ERROR(stream_mgr_->Init(data_svc_->mem_tracker()));
+ // Bump thread cache to 1GB to reduce contention for TCMalloc central
+ // list's spinlock.
+ if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
+ FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
}
#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
@@ -396,10 +386,8 @@ Status ExecEnv::StartStatestoreSubscriberService() {
}
Status ExecEnv::StartKrpcService() {
- if (FLAGS_use_krpc) {
- LOG(INFO) << "Starting KRPC service";
- RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_));
- }
+ LOG(INFO) << "Starting KRPC service";
+ RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_));
return Status::OK();
}
@@ -440,14 +428,4 @@ Status ExecEnv::GetKuduClient(
return Status::OK();
}
-DataStreamMgr* ExecEnv::ThriftStreamMgr() {
- DCHECK(!FLAGS_use_krpc);
- return dynamic_cast<DataStreamMgr*>(stream_mgr_.get());
-}
-
-KrpcDataStreamMgr* ExecEnv::KrpcStreamMgr() {
- DCHECK(FLAGS_use_krpc);
- return dynamic_cast<KrpcDataStreamMgr*>(stream_mgr_.get());
-}
-
} // namespace impala
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 54a042a..3832d0d 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -41,8 +41,6 @@ namespace impala {
class AdmissionController;
class BufferPool;
class CallableThreadPool;
-class DataStreamMgrBase;
-class DataStreamMgr;
class DataStreamService;
class QueryExecMgr;
class Frontend;
@@ -109,13 +107,7 @@ class ExecEnv {
/// StartServices() was successful.
TNetworkAddress GetThriftBackendAddress() const;
- DataStreamMgrBase* stream_mgr() { return stream_mgr_.get(); }
-
- /// TODO: Remove once a single DataStreamMgrBase implementation is standardized on.
- /// Clients of DataStreamMgrBase should use stream_mgr() unless they need to access
- /// members that are not a part of the DataStreamMgrBase interface.
- DataStreamMgr* ThriftStreamMgr();
- KrpcDataStreamMgr* KrpcStreamMgr();
+ KrpcDataStreamMgr* stream_mgr() { return stream_mgr_.get(); }
ImpalaBackendClientCache* impalad_client_cache() {
return impalad_client_cache_.get();
@@ -175,7 +167,7 @@ class ExecEnv {
private:
boost::scoped_ptr<ObjectPool> obj_pool_;
boost::scoped_ptr<MetricGroup> metrics_;
- boost::scoped_ptr<DataStreamMgrBase> stream_mgr_;
+ boost::scoped_ptr<KrpcDataStreamMgr> stream_mgr_;
boost::scoped_ptr<Scheduler> scheduler_;
boost::scoped_ptr<AdmissionController> admission_controller_;
boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index af03a2b..11122c7 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -34,14 +34,14 @@
#include "exec/scan-node.h"
#include "runtime/exec-env.h"
#include "runtime/backend-client.h"
-#include "runtime/runtime-filter-bank.h"
#include "runtime/client-cache.h"
-#include "runtime/data-stream-mgr.h"
-#include "runtime/runtime-state.h"
+#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/query-state.h"
#include "runtime/query-state.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
+#include "runtime/runtime-filter-bank.h"
+#include "runtime/runtime-state.h"
#include "runtime/thread-resource-mgr.h"
#include "scheduling/query-schedule.h"
#include "util/debug-util.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index bc490dd..3b11c07 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -52,8 +52,8 @@
/// TODO: We don't need millisecond precision here.
const int32_t STREAM_EXPIRATION_TIME_MS = 300 * 1000;
-DECLARE_bool(use_krpc);
-DECLARE_int32(datastream_sender_timeout_ms);
+DEFINE_int32(datastream_sender_timeout_ms, 120000, "(Advanced) The time, in ms, that can "
+ "elapse before a plan fragment will time-out trying to send the initial row batch.");
DEFINE_int32(datastream_service_num_deserialization_threads, 16,
"Number of threads for deserializing RPC requests deferred due to the receiver "
"not ready or the soft limit of the receiver is reached.");
@@ -98,7 +98,7 @@ inline uint32_t KrpcDataStreamMgr::GetHashValue(
return value;
}
-shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
+shared_ptr<KrpcDataStreamRecvr> KrpcDataStreamMgr::CreateRecvr(
const RowDescriptor* row_desc, const TUniqueId& finst_id, PlanNodeId dest_node_id,
int num_senders, int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
MemTracker* parent_tracker, BufferPool::ClientHandle* client) {
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index 3cd2191..5d7bc56 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -18,8 +18,6 @@
#ifndef IMPALA_RUNTIME_KRPC_DATA_STREAM_MGR_H
#define IMPALA_RUNTIME_KRPC_DATA_STREAM_MGR_H
-#include "runtime/data-stream-mgr-base.h"
-
#include <list>
#include <queue>
#include <set>
@@ -30,7 +28,6 @@
#include "common/status.h"
#include "common/object-pool.h"
-#include "runtime/data-stream-mgr-base.h"
#include "runtime/descriptors.h" // for PlanNodeId
#include "runtime/row-batch.h"
#include "util/metrics.h"
@@ -225,7 +222,7 @@ struct EndDataStreamCtx {
/// time.
/// 'total-senders-timedout-waiting-for-recvr-creation' - total number of senders that
/// timed-out while waiting for a receiver.
-class KrpcDataStreamMgr : public DataStreamMgrBase {
+class KrpcDataStreamMgr : public CacheLineAligned {
public:
KrpcDataStreamMgr(MetricGroup* metrics);
@@ -243,10 +240,10 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
/// Ownership of the receiver is shared between this DataStream mgr instance and the
/// caller. 'client' is the BufferPool's client handle for allocating buffers.
/// It's owned by the parent exchange node.
- std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
+ std::shared_ptr<KrpcDataStreamRecvr> CreateRecvr(const RowDescriptor* row_desc,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
- MemTracker* parent_tracker, BufferPool::ClientHandle* client) override;
+ MemTracker* parent_tracker, BufferPool::ClientHandle* client);
/// Handler for TransmitData() RPC.
///
@@ -286,7 +283,7 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
/// Cancels all receivers registered for fragment_instance_id immediately. The
/// receivers will not accept any row batches after being cancelled. Any buffered
/// row batches will not be freed until Close() is called on the receivers.
- void Cancel(const TUniqueId& fragment_instance_id) override;
+ void Cancel(const TUniqueId& fragment_instance_id);
/// Waits for maintenance thread and sender response thread pool to finish.
~KrpcDataStreamMgr();
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 3933e02..96cc25f 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -43,7 +43,6 @@
#include "common/names.h"
-DECLARE_bool(use_krpc);
DECLARE_int32(datastream_service_num_deserialization_threads);
using kudu::MonoDelta;
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h
index 18454d7..e19f686 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -18,8 +18,6 @@
#ifndef IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H
#define IMPALA_RUNTIME_KRPC_DATA_STREAM_RECVR_H
-#include "data-stream-recvr-base.h"
-
#include <boost/scoped_ptr.hpp>
#include <boost/thread/mutex.hpp>
@@ -84,7 +82,7 @@ class TransmitDataResponsePB;
/// - no new row batch or deferred RPCs should be added to a cancelled sender queue
/// - Cancel() will drain the deferred RPCs queue and the row batch queue
///
-class KrpcDataStreamRecvr : public DataStreamRecvrBase {
+class KrpcDataStreamRecvr {
public:
~KrpcDataStreamRecvr();
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 6928ebc..5cc80ce 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -34,10 +34,10 @@
#include "exprs/timezone_db.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/reservation-tracker.h"
-#include "runtime/data-stream-mgr-base.h"
-#include "runtime/data-stream-recvr.h"
#include "runtime/descriptors.h"
#include "runtime/exec-env.h"
+#include "runtime/krpc-data-stream-mgr.h"
+#include "runtime/krpc-data-stream-recvr.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
#include "runtime/runtime-filter-bank.h"
@@ -302,7 +302,7 @@ io::DiskIoMgr* RuntimeState::io_mgr() {
return exec_env_->disk_io_mgr();
}
-DataStreamMgrBase* RuntimeState::stream_mgr() {
+KrpcDataStreamMgr* RuntimeState::stream_mgr() {
return exec_env_->stream_mgr();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 6d38fc6..78a9864 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -36,6 +36,7 @@ class BufferPool;
class DataStreamRecvr;
class DescriptorTbl;
class Expr;
+class KrpcDataStreamMgr;
class LlvmCodeGen;
class MemTracker;
class ObjectPool;
@@ -47,7 +48,6 @@ class TimestampValue;
class ThreadResourcePool;
class TUniqueId;
class ExecEnv;
-class DataStreamMgrBase;
class HBaseTableFactory;
class TPlanFragmentCtx;
class TPlanFragmentInstanceCtx;
@@ -107,7 +107,7 @@ class RuntimeState {
: no_instance_id_;
}
ExecEnv* exec_env() { return exec_env_; }
- DataStreamMgrBase* stream_mgr();
+ KrpcDataStreamMgr* stream_mgr();
HBaseTableFactory* htable_factory();
ImpalaBackendClientCache* impalad_client_cache();
CatalogServiceClientCache* catalogd_client_cache();
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 2baffbb..20acc43 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -46,8 +46,6 @@ using namespace apache::thrift;
using namespace org::apache::impala::fb;
using namespace strings;
-DECLARE_bool(use_krpc);
-
namespace impala {
static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
@@ -75,12 +73,10 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
// requests.
local_backend_descriptor_.ip_address = ip;
LOG(INFO) << "Scheduler using " << ip << " as IP address";
- if (FLAGS_use_krpc) {
- // KRPC relies on resolved IP address.
- DCHECK(IsResolvedAddress(krpc_address));
- DCHECK_EQ(krpc_address.hostname, ip);
- local_backend_descriptor_.__set_krpc_address(krpc_address);
- }
+ // KRPC relies on resolved IP address.
+ DCHECK(IsResolvedAddress(krpc_address));
+ DCHECK_EQ(krpc_address.hostname, ip);
+ local_backend_descriptor_.__set_krpc_address(krpc_address);
coord_only_backend_config_.AddBackend(local_backend_descriptor_);
@@ -346,12 +342,10 @@ void Scheduler::ComputeFragmentExecParams(
dest.__set_fragment_instance_id(dest_params->instance_exec_params[i].instance_id);
const TNetworkAddress& host = dest_params->instance_exec_params[i].host;
dest.__set_thrift_backend(host);
- if (FLAGS_use_krpc) {
- const TBackendDescriptor& desc = LookUpBackendDesc(executor_config, host);
- DCHECK(desc.__isset.krpc_address);
- DCHECK(IsResolvedAddress(desc.krpc_address));
- dest.__set_krpc_backend(desc.krpc_address);
- }
+ const TBackendDescriptor& desc = LookUpBackendDesc(executor_config, host);
+ DCHECK(desc.__isset.krpc_address);
+ DCHECK(IsResolvedAddress(desc.krpc_address));
+ dest.__set_krpc_backend(desc.krpc_address);
}
// enumerate senders consecutively;
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 1d42a99..b7892ff 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -74,14 +74,14 @@ Status DataStreamService::Init() {
void DataStreamService::EndDataStream(const EndDataStreamRequestPB* request,
EndDataStreamResponsePB* response, RpcContext* rpc_context) {
// CloseSender() is guaranteed to eventually respond to this RPC so we don't do it here.
- ExecEnv::GetInstance()->KrpcStreamMgr()->CloseSender(request, response, rpc_context);
+ ExecEnv::GetInstance()->stream_mgr()->CloseSender(request, response, rpc_context);
}
void DataStreamService::TransmitData(const TransmitDataRequestPB* request,
TransmitDataResponsePB* response, RpcContext* rpc_context) {
FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA);
// AddData() is guaranteed to eventually respond to this RPC so we don't do it here.
- ExecEnv::GetInstance()->KrpcStreamMgr()->AddData(request, response, rpc_context);
+ ExecEnv::GetInstance()->stream_mgr()->AddData(request, response, rpc_context);
}
template<typename ResponsePBType>
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/impala-internal-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index 53a62da..c479a7f 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -80,15 +80,6 @@ void ImpalaInternalService::ReportExecStatus(TReportExecStatusResult& return_val
impala_server_->ReportExecStatus(return_val, params);
}
-void ImpalaInternalService::TransmitData(TTransmitDataResult& return_val,
- const TTransmitDataParams& params) {
- FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA);
- DCHECK(params.__isset.dest_fragment_instance_id);
- DCHECK(params.__isset.sender_id);
- DCHECK(params.__isset.dest_node_id);
- impala_server_->TransmitData(return_val, params);
-}
-
void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
const TUpdateFilterParams& params) {
FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER);
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/impala-internal-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index 3285d9d..8d5ddd5 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -37,8 +37,6 @@ class ImpalaInternalService : public ImpalaInternalServiceIf {
const TCancelQueryFInstancesParams& params);
virtual void ReportExecStatus(TReportExecStatusResult& return_val,
const TReportExecStatusParams& params);
- virtual void TransmitData(TTransmitDataResult& return_val,
- const TTransmitDataParams& params);
virtual void UpdateFilter(TUpdateFilterResult& return_val,
const TUpdateFilterParams& params);
virtual void PublishFilter(TPublishFilterResult& return_val,
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 97fa70e..077f634 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -52,7 +52,6 @@
#include "rpc/thrift-util.h"
#include "runtime/client-cache.h"
#include "runtime/coordinator.h"
-#include "runtime/data-stream-mgr.h"
#include "runtime/exec-env.h"
#include "runtime/lib-cache.h"
#include "runtime/mem-tracker.h"
@@ -119,7 +118,6 @@ DECLARE_string(authorized_proxy_group_config);
DECLARE_string(authorized_proxy_group_config_delimiter);
DECLARE_bool(abort_on_config_error);
DECLARE_bool(disk_spill_encryption);
-DECLARE_bool(use_krpc);
DECLARE_bool(use_local_catalog);
DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served."
@@ -1268,33 +1266,6 @@ void ImpalaServer::ReportExecStatus(
request_state->UpdateBackendExecStatus(params).SetTStatus(&return_val);
}
-void ImpalaServer::TransmitData(
- TTransmitDataResult& return_val, const TTransmitDataParams& params) {
- VLOG_ROW << "TransmitData(): instance_id=" << PrintId(params.dest_fragment_instance_id)
- << " node_id=" << params.dest_node_id
- << " #rows=" << params.row_batch.num_rows
- << " sender_id=" << params.sender_id
- << " eos=" << (params.eos ? "true" : "false");
- // TODO: fix Thrift so we can simply take ownership of thrift_batch instead
- // of having to copy its data
- if (params.row_batch.num_rows > 0) {
- Status status = exec_env_->ThriftStreamMgr()->AddData(
- params.dest_fragment_instance_id, params.dest_node_id, params.row_batch,
- params.sender_id);
- status.SetTStatus(&return_val);
- if (!status.ok()) {
- // should we close the channel here as well?
- return;
- }
- }
-
- if (params.eos) {
- exec_env_->ThriftStreamMgr()->CloseSender(
- params.dest_fragment_instance_id, params.dest_node_id,
- params.sender_id).SetTStatus(&return_val);
- }
-}
-
void ImpalaServer::InitializeConfigVariables() {
// Set idle_session_timeout here to let the SET command return the value of
// the command line option FLAGS_idle_session_timeout
@@ -1755,11 +1726,10 @@ void ImpalaServer::AddLocalBackendToStatestore(
local_backend_descriptor.ip_address = exec_env_->ip_address();
local_backend_descriptor.__set_proc_mem_limit(
exec_env_->process_mem_tracker()->limit());
- if (FLAGS_use_krpc) {
- const TNetworkAddress& krpc_address = exec_env_->krpc_address();
- DCHECK(IsResolvedAddress(krpc_address));
- local_backend_descriptor.__set_krpc_address(krpc_address);
- }
+ const TNetworkAddress& krpc_address = exec_env_->krpc_address();
+ DCHECK(IsResolvedAddress(krpc_address));
+ local_backend_descriptor.__set_krpc_address(krpc_address);
+
subscriber_topic_updates->emplace_back(TTopicDelta());
TTopicDelta& update = subscriber_topic_updates->back();
update.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index a31734b..8b7af09 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -59,8 +59,6 @@ class TPlanExecParams;
class TInsertResult;
class TReportExecStatusArgs;
class TReportExecStatusResult;
-class TTransmitDataArgs;
-class TTransmitDataResult;
class TNetworkAddress;
class TClientRequest;
class TExecRequest;
@@ -273,8 +271,6 @@ class ImpalaServer : public ImpalaServiceIf,
/// ImpalaInternalService rpcs
void ReportExecStatus(TReportExecStatusResult& return_val,
const TReportExecStatusParams& params);
- void TransmitData(TTransmitDataResult& return_val,
- const TTransmitDataParams& params);
void UpdateFilter(TUpdateFilterResult& return_val,
const TUpdateFilterParams& params);
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/bin/run-all-tests.sh
----------------------------------------------------------------------
diff --git a/bin/run-all-tests.sh b/bin/run-all-tests.sh
index 83b3548..5f6831e 100755
--- a/bin/run-all-tests.sh
+++ b/bin/run-all-tests.sh
@@ -39,8 +39,6 @@ if "${CLUSTER_DIR}/admin" is_kerberized; then
fi
# Parametrized Test Options
-# Disable KRPC for test cluster and test execution
-: ${DISABLE_KRPC:=false}
# Run FE Tests
: ${FE_TEST:=true}
# Run Backend Tests
@@ -77,11 +75,6 @@ if [[ "${ERASURE_CODING}" = true ]]; then
--impalad_args=--default_query_options=allow_erasure_coded_files=true"
fi
-# If KRPC tests are disabled, pass the flag to disable KRPC during cluster start.
-if [[ "${DISABLE_KRPC}" == "true" ]]; then
- TEST_START_CLUSTER_ARGS="${TEST_START_CLUSTER_ARGS} --disable_krpc"
-fi
-
# Indicates whether code coverage reports should be generated.
: ${CODE_COVERAGE:=false}
@@ -130,12 +123,6 @@ if [[ "${TARGET_FILESYSTEM}" == "local" ]]; then
COMMON_PYTEST_ARGS+=" --impalad=localhost:21000"
fi
-# If KRPC tests are disabled, pass test_no_krpc flag to pytest.
-# This includes the end-to-end tests and the custom cluster tests.
-if [[ "${DISABLE_KRPC}" == "true" ]]; then
- COMMON_PYTEST_ARGS+=" --test_no_krpc"
-fi
-
# For logging when using run-step.
LOG_DIR="${IMPALA_EE_TEST_LOGS_DIR}"
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 4d34d45..208cc35 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -60,8 +60,6 @@ parser.add_option("--state_store_args", dest="state_store_args", action="append"
parser.add_option("--catalogd_args", dest="catalogd_args", action="append",
type="string", default=[],
help="Additional arguments to pass to the Catalog Service at startup")
-parser.add_option("--disable_krpc", dest="disable_krpc", action="store_true",
- default=False, help="Disable KRPC DataStream service during startup.")
parser.add_option("--kill", "--kill_only", dest="kill_only", action="store_true",
default=False, help="Instead of starting the cluster, just kill all"
" the running impalads and the statestored.")
@@ -330,9 +328,6 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi
delay=delay_list[i],
args=args)
- if options.disable_krpc:
- args = "-use_krpc=false {args}".format(args=args)
-
# Appended at the end so they can override previous args.
if i < len(per_impalad_args):
args = "{args} {per_impalad_args}".format(
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 4797eba..bf48aaa 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -704,34 +704,6 @@ struct TCancelQueryFInstancesResult {
1: optional Status.TStatus status
}
-
-// TransmitData
-
-struct TTransmitDataParams {
- 1: required ImpalaInternalServiceVersion protocol_version
-
- // required in V1
- 2: optional Types.TUniqueId dest_fragment_instance_id
-
- // Id of this fragment in its role as a sender.
- 3: optional i32 sender_id
-
- // required in V1
- 4: optional Types.TPlanNodeId dest_node_id
-
- // optional in V1
- 5: optional Results.TRowBatch row_batch
-
- // if set to true, indicates that no more row batches will be sent
- // for this dest_node_id
- 6: optional bool eos
-}
-
-struct TTransmitDataResult {
- // required in V1
- 1: optional Status.TStatus status
-}
-
// Parameters for RequestPoolService.resolveRequestPool()
// TODO: why is this here?
struct TResolveRequestPoolParams {
@@ -881,10 +853,6 @@ service ImpalaInternalService {
TCancelQueryFInstancesResult CancelQueryFInstances(
1:TCancelQueryFInstancesParams params);
- // Called by sender to transmit single row batch. Returns error indication
- // if params.fragmentId or params.destNodeId are unknown or if data couldn't be read.
- TTransmitDataResult TransmitData(1:TTransmitDataParams params);
-
// Called by fragment instances that produce local runtime filters to deliver them to
// the coordinator for aggregation and broadcast.
TUpdateFilterResult UpdateFilter(1:TUpdateFilterParams params);
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index a5c285d..5947ce9 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -145,9 +145,6 @@ class CustomClusterTestSuite(ImpalaTestSuite):
if use_exclusive_coordinators:
cmd.append("--use_exclusive_coordinators")
- if pytest.config.option.test_no_krpc:
- cmd.append("--disable_krpc")
-
if os.environ.get("ERASURE_CODING") == "true":
cmd.append("--impalad_args=--default_query_options=allow_erasure_coded_files=true")
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/common/skip.py
----------------------------------------------------------------------
diff --git a/tests/common/skip.py b/tests/common/skip.py
index e84c75b..d23b260 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -89,10 +89,6 @@ class SkipIf:
not_ec = pytest.mark.skipif(not IS_EC, reason="Erasure Coding needed")
no_secondary_fs = pytest.mark.skipif(not SECONDARY_FILESYSTEM,
reason="Secondary filesystem needed")
- not_krpc = pytest.mark.skipif(pytest.config.option.test_no_krpc,
- reason="Test is only supported when using KRPC.")
- not_thrift = pytest.mark.skipif(not pytest.config.option.test_no_krpc,
- reason="Test is only supported when using Thrift RPC.")
class SkipIfIsilon:
caching = pytest.mark.skipif(IS_ISILON, reason="SET CACHED not implemented for Isilon")
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/common/test_skip.py
----------------------------------------------------------------------
diff --git a/tests/common/test_skip.py b/tests/common/test_skip.py
deleted file mode 100644
index 3e7d281..0000000
--- a/tests/common/test_skip.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pytest
-
-from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIf
-
-class TestSkipIf(ImpalaTestSuite):
- """
- This suite tests the effectiveness of various SkipIf decorators.
- TODO: Remove this once we have tests that make use of these decorators.
- """
-
- @classmethod
- def get_workload(cls):
- return 'functional-query'
-
- @SkipIf.not_krpc
- def test_skip_if_not_krpc(self):
- assert not pytest.config.option.test_no_krpc
-
- @SkipIf.not_thrift
- def test_skip_if_not_thrift(self):
- assert pytest.config.option.test_no_krpc
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/conftest.py
----------------------------------------------------------------------
diff --git a/tests/conftest.py b/tests/conftest.py
index a1f1859..f01ecb2 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -116,10 +116,6 @@ def pytest_addoption(parser):
help=("Indicates that tests are being run against a remote cluster. "
"Some tests may be marked to skip or xfail on remote clusters."))
- parser.addoption("--test_no_krpc", dest="test_no_krpc", action="store_true",
- default=False, help="Run all tests with KRPC disabled. This assumes "
- "that the test cluster has been started with --disable_krpc.")
-
parser.addoption("--shard_tests", default=None,
help="If set to N/M (e.g., 3/5), will split the tests into "
"M partitions and run the Nth partition. 1-indexed.")
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/custom_cluster/test_krpc_mem_usage.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_mem_usage.py b/tests/custom_cluster/test_krpc_mem_usage.py
index d358baa..6b0fe6a 100644
--- a/tests/custom_cluster/test_krpc_mem_usage.py
+++ b/tests/custom_cluster/test_krpc_mem_usage.py
@@ -26,7 +26,6 @@ DATA_STREAM_MGR_METRIC = "Data Stream Manager Early RPCs"
DATA_STREAM_SVC_METRIC = "Data Stream Service Queue"
ALL_METRICS = [ DATA_STREAM_MGR_METRIC, DATA_STREAM_SVC_METRIC ]
-@SkipIf.not_krpc
class TestKrpcMemUsage(CustomClusterTestSuite):
"""Test for memory usage tracking when using KRPC."""
TEST_QUERY = "select count(c2.string_col) from \
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/custom_cluster/test_krpc_metrics.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_metrics.py b/tests/custom_cluster/test_krpc_metrics.py
index de16ccc..e728237 100644
--- a/tests/custom_cluster/test_krpc_metrics.py
+++ b/tests/custom_cluster/test_krpc_metrics.py
@@ -24,7 +24,6 @@ from tests.common.impala_cluster import ImpalaCluster
from tests.common.skip import SkipIf, SkipIfBuildType
from tests.verifiers.mem_usage_verifier import MemUsageVerifier
-@SkipIf.not_krpc
class TestKrpcMetrics(CustomClusterTestSuite):
"""Test for KRPC metrics that require special arguments during cluster startup."""
RPCZ_URL = 'http://localhost:25000/rpcz?json'
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/custom_cluster/test_rpc_exception.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_rpc_exception.py b/tests/custom_cluster/test_rpc_exception.py
index 2784e88..fc60c1a 100644
--- a/tests/custom_cluster/test_rpc_exception.py
+++ b/tests/custom_cluster/test_rpc_exception.py
@@ -68,12 +68,6 @@ class TestRPCException(CustomClusterTestSuite):
def test_rpc_send_timed_out(self, vector):
self.execute_test_query(None)
- @SkipIf.not_thrift
- @pytest.mark.execute_serially
- @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=4")
- def test_rpc_recv_closed_connection(self, vector):
- self.execute_test_query("Called read on non-open socket")
-
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=5")
def test_rpc_recv_timed_out(self, vector):
@@ -94,12 +88,6 @@ class TestRPCException(CustomClusterTestSuite):
def test_rpc_secure_send_timed_out(self, vector):
self.execute_test_query(None)
- @SkipIf.not_thrift
- @pytest.mark.execute_serially
- @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=9")
- def test_rpc_secure_recv_closed_connection(self, vector):
- self.execute_test_query("TTransportException: Transport not open")
-
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=10")
def test_rpc_secure_recv_timed_out(self, vector):
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/query_test/test_codegen.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_codegen.py b/tests/query_test/test_codegen.py
index 5bc0cc6..7436c13 100644
--- a/tests/query_test/test_codegen.py
+++ b/tests/query_test/test_codegen.py
@@ -52,7 +52,6 @@ class TestCodegen(ImpalaTestSuite):
assert len(exec_options) > 0
assert_codegen_enabled(result.runtime_profile, [1])
- @SkipIf.not_krpc
def test_datastream_sender_codegen(self, vector):
"""Test the KrpcDataStreamSender's codegen logic"""
self.run_test_case('QueryTest/datastream-sender-codegen', vector)
http://git-wip-us.apache.org/repos/asf/impala/blob/8d7f6386/tests/webserver/test_web_pages.py
----------------------------------------------------------------------
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 60deca4..000e386 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -249,7 +249,6 @@ class TestWebPage(ImpalaTestSuite):
assert any(pattern in t for t in thread_names), \
"Could not find thread matching '%s'" % pattern
- @SkipIf.not_krpc
def test_krpc_rpcz(self):
"""Test that KRPC metrics are exposed in /rpcz and that they are updated when
executing a query."""