You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/02/28 08:21:07 UTC

[1/3] impala git commit: IMPALA-6512: Maintenace thread period should respect FLAGS_datastream_sender_timeout_ms

Repository: impala
Updated Branches:
  refs/heads/master 61ca7be6e -> d57fbec6f


IMPALA-6512: Maintenace thread period should respect FLAGS_datastream_sender_timeout_ms

Previously, the maintenance thread in KrpcDataStreamMgr will wake up
once every 10s to check for early senders which timed out. However,
FLAGS_datastream_sender_timeout_ms can be set to a value smaller than
10s. In which case, we may not notice the timed-out senders until much
later. This change fixes the problem by changing the wakeup period of
the maintenance thread to be min of FLAGS_datastream_sender_timeout_ms/2
and 10000 milliseconds. Also, this change addresses a TODO in the code by
moving the check for closed receivers in the 'closed_stream_cache_' from
the handler for EOS RPC to the maintenance thread's loop.

Testing done: test_exchange_large_delay.py which failed previously
when KRPC is enabled. Core debug builds with KRPC enabled.

Change-Id: I804cef7cc991007ec44375f8eac804aa2df46bd7
Reviewed-on: http://gerrit.cloudera.org:8080/9447
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/065974cc
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/065974cc
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/065974cc

Branch: refs/heads/master
Commit: 065974ccc5dad26794610dc81617c71a2b3e9911
Parents: 61ca7be
Author: Michael Ho <kw...@cloudera.com>
Authored: Sat Feb 24 14:14:54 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 27 22:29:32 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/data-stream-test.cc     |  1 +
 be/src/runtime/krpc-data-stream-mgr.cc | 48 ++++++++++++++---------------
 2 files changed, 25 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/065974cc/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 11374a2..bd76b57 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -699,6 +699,7 @@ class DataStreamTestThriftOnly : public DataStreamTest {
 class DataStreamTestShortDeserQueue : public DataStreamTest {
  protected:
   virtual void SetUp() {
+    FLAGS_datastream_sender_timeout_ms = 10000;
     FLAGS_datastream_service_num_deserialization_threads = 1;
     FLAGS_datastream_service_deserialization_queue_size = 1;
     DataStreamTest::SetUp();

http://git-wip-us.apache.org/repos/asf/impala/blob/065974cc/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 184ec49..02609c8 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -285,27 +285,6 @@ void KrpcDataStreamMgr::CloseSender(const EndDataStreamRequestPB* request,
   // already. In either cases, it's safe to just return an OK status.
   if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id());
   RespondAndReleaseRpc(Status::OK(), response, rpc_context, service_mem_tracker_);
-
-  {
-    // TODO: Move this to maintenance thread.
-    // Remove any closed streams that have been in the cache for more than
-    // STREAM_EXPIRATION_TIME_MS.
-    lock_guard<mutex> l(lock_);
-    ClosedStreamMap::iterator it = closed_stream_expirations_.begin();
-    int64_t now = MonotonicMillis();
-    int32_t before = closed_stream_cache_.size();
-    while (it != closed_stream_expirations_.end() && it->first < now) {
-      closed_stream_cache_.erase(it->second);
-      closed_stream_expirations_.erase(it++);
-    }
-    DCHECK_EQ(closed_stream_cache_.size(), closed_stream_expirations_.size());
-    int32_t after = closed_stream_cache_.size();
-    if (before != after) {
-      VLOG_QUERY << "Reduced stream ID cache from " << before << " items, to " << after
-                 << ", eviction took: "
-                 << PrettyPrinter::Print(MonotonicMillis() - now, TUnit::TIME_MS);
-    }
-  }
 }
 
 Status KrpcDataStreamMgr::DeregisterRecvr(
@@ -385,12 +364,15 @@ void KrpcDataStreamMgr::RespondAndReleaseRpc(const Status& status,
 }
 
 void KrpcDataStreamMgr::Maintenance() {
+  const int32_t sleep_time_ms =
+      min(max(1, FLAGS_datastream_sender_timeout_ms / 2), 10000);
   while (true) {
+    const int64_t now = MonotonicMillis();
+
     // Notify any senders that have been waiting too long for their receiver to
     // appear. Keep lock_ held for only a short amount of time.
     vector<EarlySendersList> timed_out_senders;
     {
-      int64_t now = MonotonicMillis();
       lock_guard<mutex> l(lock_);
       auto it = early_senders_map_.begin();
       while (it != early_senders_map_.end()) {
@@ -415,9 +397,27 @@ void KrpcDataStreamMgr::Maintenance() {
         RespondToTimedOutSender<EndDataStreamCtx, EndDataStreamRequestPB>(ctx);
       }
     }
+
+    // Remove any closed streams that have been in the cache for more than
+    // STREAM_EXPIRATION_TIME_MS.
+    {
+      lock_guard<mutex> l(lock_);
+      ClosedStreamMap::iterator it = closed_stream_expirations_.begin();
+      int32_t before = closed_stream_cache_.size();
+      while (it != closed_stream_expirations_.end() && it->first < now) {
+        closed_stream_cache_.erase(it->second);
+        closed_stream_expirations_.erase(it++);
+      }
+      DCHECK_EQ(closed_stream_cache_.size(), closed_stream_expirations_.size());
+      int32_t after = closed_stream_cache_.size();
+      if (before != after) {
+        VLOG_QUERY << "Reduced stream ID cache from " << before << " items, to " << after
+                   << ", eviction took: "
+                   << PrettyPrinter::Print(MonotonicMillis() - now, TUnit::TIME_MS);
+      }
+    }
     bool timed_out = false;
-    // Wait for 10s
-    shutdown_promise_.Get(10000, &timed_out);
+    shutdown_promise_.Get(sleep_time_ms, &timed_out);
     if (!timed_out) return;
   }
 }


[2/3] impala git commit: IMPALA-6565: Fix some bugs in KprcDataStreamRecvr

Posted by kw...@apache.org.
IMPALA-6565: Fix some bugs in KprcDataStreamRecvr

This change fixes a couple of issues found with stress test
(concurrent_select.py)

1. The local variable 'status' was mistakenly defined twice in
DequeueDeferredRpc(): one in the outer scope and one in the
inner scope. This causes the error status of AddBatchWork()
to be dropped when the inner scope ends. As a result, the error
status from AddBatchWork() (e.g. MemLimitExceeded) will not be
propagated back to the sender and the receiver will continue
to operate with some missing data, leading to wrong query
results. This change fixes the problem by removing the redefinition
of the status local variable. It also adds some counters in the
profile to make diagnostics of failed RPCs or missing EOS easier.

2. AddBatchWork() may return early without enqueuing a row
batch if it encounters a failure creating a row batch. The bug
is that it may return early without notifying threads waiting
on 'data_arrival_cv_', causing threads waiting for
'num_pending_enqueue_' to 0 to wait indefinitely. This may
cause some fragment instances to stick around after the query
has been cancelled.

3. This one is not mainfesting during stress test and it's benign.
There is a missing check for 'is_cancelled_' in TakeOverEarlySenders()
before enqueuing an early sender into the deferred_rpc_ queue.
The bug is benign because TakeOverEarlySenders() is called from
in fragment execution thread context so it cannot call close the
receiver and call CancelStream() on the receiver until it returns
from CreateRecvr() unless the query is cancelled. In which case,
the sender should also get the cancellation. That said, it should
be fixed. This change checks for the 'is_cancelled_' flag before
enqueuing an early sender into the 'deferred_rpc_' queue.

Testing done: Stress test consistently reproduced the problems before.
With this fix, no wrong results have been seen after 2 iterations
of stress tests, which translates to about 20000 queries being run.

Change-Id: I6b2985a47021ebd4a970861040e7474aca7941b5
Reviewed-on: http://gerrit.cloudera.org:8080/9439
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4c8b02ff
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4c8b02ff
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4c8b02ff

Branch: refs/heads/master
Commit: 4c8b02ffc43e8346440a45f6fee7b2b761a006fd
Parents: 065974c
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Feb 22 16:19:50 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 01:17:29 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/krpc-data-stream-recvr.cc  | 34 ++++++++++++++++++++------
 be/src/runtime/krpc-data-stream-recvr.h   | 19 +++++++++++---
 be/src/runtime/krpc-data-stream-sender.cc |  5 ++++
 be/src/runtime/krpc-data-stream-sender.h  |  6 +++++
 4 files changed, 53 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4c8b02ff/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 2f379f4..4f85996 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -298,7 +298,7 @@ Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
   DCHECK(lock != nullptr);
   DCHECK(lock->owns_lock());
 
-  COUNTER_ADD(recvr_->num_accepted_batches_, 1);
+  COUNTER_ADD(recvr_->num_received_batches_, 1);
   COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
   // Reserve queue space before dropping the lock below.
   recvr_->num_buffered_bytes_.Add(batch_size);
@@ -324,9 +324,11 @@ Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
   --num_pending_enqueue_;
   if (UNLIKELY(!status.ok())) {
     recvr_->num_buffered_bytes_.Add(-batch_size);
+    data_arrival_cv_.notify_one();
     return status;
   }
   VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size;
+  COUNTER_ADD(recvr_->num_enqueued_batches_, 1);
   batch_queue_.emplace_back(batch_size, move(batch));
   data_arrival_cv_.notify_one();
   return Status::OK();
@@ -347,6 +349,7 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
   kudu::Slice tuple_offsets;
   kudu::Slice tuple_data;
   int64_t batch_size;
+  COUNTER_ADD(recvr_->num_arrived_batches_, 1);
   Status status = UnpackRequest(request, rpc_context, &tuple_offsets, &tuple_data,
       &batch_size);
   if (UNLIKELY(!status.ok())) {
@@ -400,15 +403,16 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
     DCHECK_GT(num_deserialize_tasks_pending_, 0);
     --num_deserialize_tasks_pending_;
 
-    // Returns if the queue has been cancelled or if it's empty.
-    if (UNLIKELY(is_cancelled_) || deferred_rpcs_.empty()) return;
+    // Returns if the queue is empty. The queue may be drained in Cancel().
+    if (deferred_rpcs_.empty()) return;
+    DCHECK(!is_cancelled_);
 
     // Try enqueuing the first entry into 'batch_queue_'.
     ctx.swap(deferred_rpcs_.front());
     kudu::Slice tuple_offsets;
     kudu::Slice tuple_data;
     int64_t batch_size;
-    Status status = UnpackRequest(ctx->request, ctx->rpc_context, &tuple_offsets,
+    status = UnpackRequest(ctx->request, ctx->rpc_context, &tuple_offsets,
         &tuple_data, &batch_size);
     // Reply with error status if the entry cannot be unpacked.
     if (UNLIKELY(!status.ok())) {
@@ -429,6 +433,7 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
     deferred_rpcs_.pop();
     const RowBatchHeaderPB& header = ctx->request->row_batch_header();
     status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
+    DCHECK(!status.ok() || !batch_queue_.empty());
   }
 
   // Responds to the sender to ack the insertion of the row batches.
@@ -439,12 +444,17 @@ void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
     unique_ptr<TransmitDataCtx> ctx) {
   int sender_id = ctx->request->sender_id();
   recvr_->mem_tracker()->Consume(ctx->rpc_context->GetTransferSize());
-  COUNTER_ADD(recvr_->num_deferred_batches_, 1);
+  COUNTER_ADD(recvr_->num_arrived_batches_, 1);
   {
     lock_guard<SpinLock> l(lock_);
+    if (UNLIKELY(is_cancelled_)) {
+      RespondAndReleaseRpc(Status::OK(), ctx);
+      return;
+    }
     deferred_rpcs_.push(move(ctx));
     ++num_deserialize_tasks_pending_;
   }
+  COUNTER_ADD(recvr_->num_deferred_batches_, 1);
   recvr_->mgr_->EnqueueDeserializeTask(recvr_->fragment_instance_id(),
       recvr_->dest_node_id(), sender_id, 1);
 }
@@ -489,6 +499,9 @@ void KrpcDataStreamRecvr::SenderQueue::Close() {
   // risk running into a race which can leak row batches. Please see IMPALA-3034.
   DCHECK(is_cancelled_);
 
+  // The deferred RPCs should all have been responded to in Cancel().
+  DCHECK(deferred_rpcs_.empty());
+
   // Wait for any pending insertion to complete first.
   while (num_pending_enqueue_ > 0) data_arrival_cv_.wait(l);
 
@@ -566,10 +579,16 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
   inactive_timer_ = profile_->inactive_timer();
   num_early_senders_ =
       ADD_COUNTER(sender_side_profile_, "NumEarlySenders", TUnit::UNIT);
+  num_arrived_batches_ =
+      ADD_COUNTER(sender_side_profile_, "NumBatchesArrived", TUnit::UNIT);
+  num_received_batches_ =
+      ADD_COUNTER(sender_side_profile_, "NumBatchesReceived", TUnit::UNIT);
+  num_enqueued_batches_ =
+      ADD_COUNTER(sender_side_profile_, "NumBatchesEnqueued", TUnit::UNIT);
   num_deferred_batches_ =
       ADD_COUNTER(sender_side_profile_, "NumBatchesDeferred", TUnit::UNIT);
-  num_accepted_batches_ =
-      ADD_COUNTER(sender_side_profile_, "NumBatchesAccepted", TUnit::UNIT);
+  num_eos_received_ =
+      ADD_COUNTER(sender_side_profile_, "NumEosReceived", TUnit::UNIT);
 }
 
 Status KrpcDataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {
@@ -600,6 +619,7 @@ void KrpcDataStreamRecvr::TakeOverEarlySender(unique_ptr<TransmitDataCtx> ctx) {
 void KrpcDataStreamRecvr::RemoveSender(int sender_id) {
   int use_sender_id = is_merging_ ? sender_id : 0;
   sender_queues_[use_sender_id]->DecrementSenders();
+  COUNTER_ADD(num_eos_received_, 1);
 }
 
 void KrpcDataStreamRecvr::CancelStream() {

http://git-wip-us.apache.org/repos/asf/impala/blob/4c8b02ff/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h
index f4c2a5e..758079b 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -195,7 +195,7 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   RuntimeProfile* recvr_side_profile_;
   RuntimeProfile* sender_side_profile_;
 
-  /// Number of bytes received.
+  /// Number of bytes received but not necessarily enqueued.
   RuntimeProfile::Counter* bytes_received_counter_;
 
   /// Time series of number of bytes received, samples bytes_received_counter_
@@ -211,11 +211,22 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   /// TODO: Turn this into a wall-clock timer.
   RuntimeProfile::Counter* first_batch_wait_total_timer_;
 
-  /// Total number of batches received and deferred as sender queue is full.
+  /// Total number of batches which arrived at this receiver but not necessarily received
+  /// or enqueued. An arrived row batch will eventually be received if there is no error
+  /// unpacking the RPC payload and the receiving stream is not cancelled.
+  RuntimeProfile::Counter* num_arrived_batches_;
+
+  /// Total number of batches received but not necessarily enqueued.
+  RuntimeProfile::Counter* num_received_batches_;
+
+  /// Total number of batches received and enqueued into the row batch queue.
+  RuntimeProfile::Counter* num_enqueued_batches_;
+
+  /// Total number of batches deferred because of early senders or full row batch queue.
   RuntimeProfile::Counter* num_deferred_batches_;
 
-  /// Total number of batches received and accepted into the sender queue.
-  RuntimeProfile::Counter* num_accepted_batches_;
+  /// Total number of EOS received.
+  RuntimeProfile::Counter* num_eos_received_;
 
   /// Total wall-clock time spent waiting for data to arrive in the recv buffer.
   RuntimeProfile::Counter* data_arrival_timer_;

http://git-wip-us.apache.org/repos/asf/impala/blob/4c8b02ff/be/src/runtime/krpc-data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.cc b/be/src/runtime/krpc-data-stream-sender.cc
index 6c0ad01..d758826 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -306,6 +306,7 @@ Status KrpcDataStreamSender::Channel::Init(RuntimeState* state) {
 }
 
 void KrpcDataStreamSender::Channel::MarkDone(const Status& status) {
+  if (UNLIKELY(!status.ok())) COUNTER_ADD(parent_->rpc_failure_counter_, 1);
   rpc_status_ = status;
   rpc_in_flight_ = false;
   rpc_in_flight_batch_ = nullptr;
@@ -535,9 +536,11 @@ Status KrpcDataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
     std::unique_lock<SpinLock> l(lock_);
     RETURN_IF_ERROR(WaitForRpc(&l));
     DCHECK(!rpc_in_flight_);
+    DCHECK(rpc_status_.ok());
     if (UNLIKELY(remote_recvr_closed_)) return Status::OK();
     VLOG_RPC << "calling EndDataStream() to terminate channel.";
     rpc_in_flight_ = true;
+    COUNTER_ADD(parent_->eos_sent_counter_, 1);
     RETURN_IF_ERROR(DoEndDataStreamRpc());
     RETURN_IF_ERROR(WaitForRpc(&l));
   }
@@ -619,7 +622,9 @@ Status KrpcDataStreamSender::Prepare(
       &partition_expr_evals_));
   serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime");
   rpc_retry_counter_ = ADD_COUNTER(profile(), "RpcRetry", TUnit::UNIT);
+  rpc_failure_counter_ = ADD_COUNTER(profile(), "RpcFailure", TUnit::UNIT);
   bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+  eos_sent_counter_ = ADD_COUNTER(profile(), "EosSent", TUnit::UNIT);
   uncompressed_bytes_counter_ =
       ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
   total_sent_rows_counter_= ADD_COUNTER(profile(), "RowsReturned", TUnit::UNIT);

http://git-wip-us.apache.org/repos/asf/impala/blob/4c8b02ff/be/src/runtime/krpc-data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-sender.h b/be/src/runtime/krpc-data-stream-sender.h
index ce0851e..0c3a32e 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -157,9 +157,15 @@ class KrpcDataStreamSender : public DataSink {
   /// Number of TransmitData() RPC retries due to remote service being busy.
   RuntimeProfile::Counter* rpc_retry_counter_ = nullptr;
 
+  /// Total number of times RPC fails or the remote responds with a non-retryable error.
+  RuntimeProfile::Counter* rpc_failure_counter_ = nullptr;
+
   /// Total number of bytes sent.
   RuntimeProfile::Counter* bytes_sent_counter_ = nullptr;
 
+  /// Total number of EOS sent.
+  RuntimeProfile::Counter* eos_sent_counter_ = nullptr;
+
   /// Total number of bytes of the row batches before compression.
   RuntimeProfile::Counter* uncompressed_bytes_counter_ = nullptr;
 


[3/3] impala git commit: IMPALA-6588: don't add empty list of ranges in text scan

Posted by kw...@apache.org.
IMPALA-6588: don't add empty list of ranges in text scan

DiskIoMgr::AddScanRanges() can returned CANCELLED even when adding an
empty list of scan ranges. We should avoid adding empty lists of
scan ranges, specifically after all ranges have been completed, e.g.
if the scanner threads all complete before scan ranges are issued.

The specific race that was possible when added non-compressed text
ranges was:
1. HdfsTextScanner::IssueInitialRanges() starts executing on thread A and
   issues 1 or more non-compressed ranges.
2. A scanner thread processes all the non-compressed ranges, notices
   completion and calls SetDone(), which cancels reader_context_.
3. HdfsTextScanner::IssueInitialRanges() calls AddScanRanges() with empty
  compressed ranges, and returns CANCELLED because the reader_context_ is
  cancelled.

This patch fixes HdfsTextScanner::IssueInitialRanges() to avoid the
anti-pattern of adding empty lists of scan ranges.

Testing:
Ran core tests.

Looped the table sample and scanner tests for a while.

Change-Id: I661a9e234fb87ebdd2596519b44ffba0d7e91d4c
Reviewed-on: http://gerrit.cloudera.org:8080/9456
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d57fbec6
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d57fbec6
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d57fbec6

Branch: refs/heads/master
Commit: d57fbec6f67b83227b4c6117476da8f7d75fc4f6
Parents: 4c8b02f
Author: Tim Armstrong <ta...@tarmstrong-ubuntu.gce.cloudera.com>
Authored: Mon Feb 26 16:41:59 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 28 01:45:57 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 10 +++++++---
 be/src/exec/hdfs-scan-node-base.cc  |  1 +
 be/src/exec/hdfs-text-scanner.cc    |  6 ++++--
 be/src/runtime/io/disk-io-mgr.cc    |  1 +
 be/src/runtime/io/disk-io-mgr.h     |  2 +-
 5 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d57fbec6/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 51a39be..574ddb0 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -121,9 +121,13 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
       }
     }
   }
-  // The threads that process the footer will also do the scan, so we mark all the files
-  // as complete here.
-  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
+  // We may not have any scan ranges if this scan node does not have the footer split for
+  // any Parquet file.
+  if (footer_ranges.size() > 0) {
+    // The threads that process the footer will also do the scan, so we mark all the files
+    // as complete here.
+    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/d57fbec6/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 89f3859..5625389 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -548,6 +548,7 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
 Status HdfsScanNodeBase::AddDiskIoRanges(
     const vector<ScanRange*>& ranges, int num_files_queued) {
   DCHECK(!progress_.done()) << "Don't call AddScanRanges() after all ranges finished.";
+  DCHECK_GT(ranges.size(), 0);
   RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), ranges));
   num_unqueued_files_.Add(-num_files_queued);
   DCHECK_GE(num_unqueued_files_.Load(), 0);

http://git-wip-us.apache.org/repos/asf/impala/blob/d57fbec6/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 253bcc8..2a6a912 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -144,8 +144,10 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
         DCHECK(false);
     }
   }
-  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
-          compressed_text_files));
+  if (compressed_text_scan_ranges.size() > 0) {
+    RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
+        compressed_text_files));
+  }
   if (lzo_text_files.size() > 0) {
     // This will dlopen the lzo binary and can fail if the lzo binary is not present.
     RETURN_IF_ERROR(HdfsLzoTextScanner::IssueInitialRanges(scan_node, lzo_text_files));

http://git-wip-us.apache.org/repos/asf/impala/blob/d57fbec6/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 07e02b4..0d2afe2 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -338,6 +338,7 @@ Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
 
 Status DiskIoMgr::AddScanRanges(
     RequestContext* reader, const vector<ScanRange*>& ranges) {
+  DCHECK_GT(ranges.size(), 0);
   // Validate and initialize all ranges
   for (int i = 0; i < ranges.size(); ++i) {
     RETURN_IF_ERROR(ValidateScanRange(ranges[i]));

http://git-wip-us.apache.org/repos/asf/impala/blob/d57fbec6/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 760d0e9..2b6881b 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -256,7 +256,7 @@ class DiskIoMgr : public CacheLineAligned {
   /// Adds the scan ranges to reader's queues, but does not start scheduling it. The range
   /// can be scheduled by a thread calling GetNextUnstartedRange(). This call is
   /// non-blocking. The caller must not deallocate the scan range pointers before
-  /// UnregisterContext().
+  /// UnregisterContext(). 'ranges' must not be empty.
   Status AddScanRanges(
       RequestContext* reader, const std::vector<ScanRange*>& ranges) WARN_UNUSED_RESULT;