You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by to...@apache.org on 2018/07/18 00:55:14 UTC

[1/3] impala git commit: IMPALA-7186: [DOCS] Documented the KUDU_READ_MODE query option

Repository: impala
Updated Branches:
  refs/heads/master 7f3f63424 -> effe4e666


IMPALA-7186: [DOCS] Documented the KUDU_READ_MODE query option

Change-Id: I49b4ec29ae8cdbee8b3d38bdf2e678b4e9560952
Reviewed-on: http://gerrit.cloudera.org:8080/10897
Reviewed-by: Alex Rodoni <ar...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3f393bc2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3f393bc2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3f393bc2

Branch: refs/heads/master
Commit: 3f393bc27c68f23fa7032fb941adf014e18b7b3b
Parents: 7f3f634
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Mon Jul 9 19:42:03 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 17 21:12:53 2018 +0000

----------------------------------------------------------------------
 docs/impala.ditamap                   |  1 +
 docs/impala_keydefs.ditamap           |  3 +
 docs/topics/impala_kudu_read_mode.xml | 88 ++++++++++++++++++++++++++++++
 3 files changed, 92 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3f393bc2/docs/impala.ditamap
----------------------------------------------------------------------
diff --git a/docs/impala.ditamap b/docs/impala.ditamap
index 060a819..7e8b106 100644
--- a/docs/impala.ditamap
+++ b/docs/impala.ditamap
@@ -192,6 +192,7 @@ under the License.
           <topicref href="topics/impala_explain_level.xml"/>
           <topicref href="topics/impala_hbase_cache_blocks.xml"/>
           <topicref href="topics/impala_hbase_caching.xml"/>
+          <topicref href="topics/impala_kudu_read_mode.xml"/>
           <topicref href="topics/impala_live_progress.xml"/>
           <topicref href="topics/impala_live_summary.xml"/>
           <topicref href="topics/impala_max_errors.xml"/>

http://git-wip-us.apache.org/repos/asf/impala/blob/3f393bc2/docs/impala_keydefs.ditamap
----------------------------------------------------------------------
diff --git a/docs/impala_keydefs.ditamap b/docs/impala_keydefs.ditamap
index 0950af2..ac97531 100644
--- a/docs/impala_keydefs.ditamap
+++ b/docs/impala_keydefs.ditamap
@@ -10521,6 +10521,7 @@ under the License.
   <keydef href="https://issues.apache.org/jira/browse/IMPALA-9999" scope="external" format="html" keys="IMPALA-9999"/>
 
 <!-- Short form of mapping from Impala release to vendor-specific releases, for use in headings. -->
+  <keydef keys="impala31"><topicmeta><keywords><keyword>Impala 3.1</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala30"><topicmeta><keywords><keyword>Impala 3.0</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala212"><topicmeta><keywords><keyword>Impala 2.12</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala211"><topicmeta><keywords><keyword>Impala 2.11</keyword></keywords></topicmeta></keydef>
@@ -10540,6 +10541,7 @@ under the License.
 
 <!-- 3-part forms of version numbers, for use in release notes. -->
 <!-- Using spaced-out form to avoid conflict with variable for 2.1.10 -->
+  <keydef keys="impala3_1_0"><topicmeta><keywords><keyword>Impala 3.1.0</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala3_00_0"><topicmeta><keywords><keyword>Impala 3.0.0</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala2_12_0"><topicmeta><keywords><keyword>Impala 2.12.0</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala2_11_0"><topicmeta><keywords><keyword>Impala 2.11.0</keyword></keywords></topicmeta></keydef>
@@ -10581,6 +10583,7 @@ under the License.
   <keydef keys="impala132"><topicmeta><keywords><keyword>Impala 1.3.2</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala130"><topicmeta><keywords><keyword>Impala 1.3.0</keyword></keywords></topicmeta></keydef>
 
+  <keydef keys="impala31_full"><topicmeta><keywords><keyword>Impala 3.1</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala30_full"><topicmeta><keywords><keyword>Impala 3.0</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala212_full"><topicmeta><keywords><keyword>Impala 2.12</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala211_full"><topicmeta><keywords><keyword>Impala 2.11</keyword></keywords></topicmeta></keydef>

http://git-wip-us.apache.org/repos/asf/impala/blob/3f393bc2/docs/topics/impala_kudu_read_mode.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_kudu_read_mode.xml b/docs/topics/impala_kudu_read_mode.xml
new file mode 100644
index 0000000..a2d34b0
--- /dev/null
+++ b/docs/topics/impala_kudu_read_mode.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE concept PUBLIC "-//OASIS//DTD DITA Concept//EN" "concept.dtd">
+<concept id="kudu_read_mode" rev="3.1.0 IMPALA-6812">
+
+  <title>KUDU_READ_MODE Query Option (<keyword keyref="impala31"/> or higher only)</title>
+
+  <titlealts audience="PDF">
+
+    <navtitle>KUDU_READ_MODE</navtitle>
+
+  </titlealts>
+
+  <prolog>
+    <metadata>
+      <data name="Category" value="Impala"/>
+      <data name="Category" value="Impala Query Options"/>
+      <data name="Category" value="Performance"/>
+      <data name="Category" value="Developers"/>
+      <data name="Category" value="Data Analysts"/>
+    </metadata>
+  </prolog>
+
+  <conbody>
+
+    <p rev="3.1.0 IMPALA-6812">
+      The <codeph>KUDU_READ_MODE</codeph> query option allows you to set a desired consistency
+      level for scans of Kudu tables.
+    </p>
+
+    <p>
+      <b>Type:</b> String
+    </p>
+
+    <p>
+      <b>Default:</b> <codeph>"DEFAULT"</codeph>
+    </p>
+
+    <p>
+      <b>Added in:</b> <keyword keyref="impala31"/>
+    </p>
+
+    <p conref="../shared/impala_common.xml#common/usage_notes_blurb"/>
+
+    <p>
+      The following values are supported for the query option:
+      <ul>
+        <li>
+          <codeph>"DEFAULT"</codeph>: The value of the startup flag,
+          <codeph>--kudu_read_mode</codeph>, is used.
+        </li>
+
+        <li>
+          <codeph>"READ_LATEST"</codeph>: Commonly known as the Read Committed isolation mode,
+          in this mode, Kudu provides no consistency guarantees for this mode, except that all
+          returned rows were committed at some point.
+        </li>
+
+        <li>
+          <codeph>"READ_AT_SNAPSHOT"</codeph>: Kudu will take a snapshot of the current state of
+          the data and perform the scan over the snapshot, possibly after briefly waiting for
+          ongoing writes to complete. This provides "Read Your Writes" consistency within a
+          single Impala session, except in the case of a Kudu leader change. See the Kudu
+          documentation for more details.
+        </li>
+      </ul>
+    </p>
+
+  </conbody>
+
+</concept>


[3/3] impala git commit: IMPALA-7294. TABLESAMPLE should not allocate array based on total table file count

Posted by to...@apache.org.
IMPALA-7294. TABLESAMPLE should not allocate array based on total table file count

This changes HdfsTable.getFilesSample() to allocate its intermediate
sampling array based on the number of files in the selected
(post-pruning) partitions, rather than the total number of files in the
table. While the former behavior was correct (the total file count is of
course an upper bound on the pruned file count), it was an unnecessarily
large allocation, which has some downsides around garbage collection.

In addition, this is important for the LocalCatalog implementation of
table sampling, since we do not want to have to load all partition file
lists in order to compute a sample over a pruned subset of partitions.

The original code indicated that this was an optimization to avoid
looping over the partition list an extra time. However, typical
partition lists are relatively small even in the worst case (order of
100k) and looping over 100k in-memory Java objects is not likely to be
the bottleneck in planning any query. This is especially true
considering that we loop over that same list later in the function
anyway, so we probably aren't saving page faults or LLC cache misses
either.

In testing this change I noticed that the existing test for TABLESAMPLE
didn't test TABLESAMPLE when applied in conjunction with a predicate.
I added a new dimension to the test which employs a predicate which
prunes some partitions to ensure that the code works in that case.
I also added coverage of the "100%" sampling parameter as a sanity check
that it returns the same results as a non-sampled query.

Change-Id: I0248d89bcd9dd4ff8b4b85fef282c19e3fe9bdd5
Reviewed-on: http://gerrit.cloudera.org:8080/10936
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/effe4e66
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/effe4e66
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/effe4e66

Branch: refs/heads/master
Commit: effe4e66684fcd86f0e6b78bcc7b815a951e3018
Parents: c845aab
Author: Todd Lipcon <to...@cloudera.com>
Authored: Thu Jul 12 17:07:09 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Jul 17 22:56:50 2018 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/HdfsTable.java    | 22 +++++++++++++-------
 tests/query_test/test_tablesample.py            | 20 +++++++++++++-----
 2 files changed, 29 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/effe4e66/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index de9e3d8..957b1f9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -2158,16 +2158,20 @@ public class HdfsTable extends Table implements FeFsTable {
     Preconditions.checkState(percentBytes >= 0 && percentBytes <= 100);
     Preconditions.checkState(minSampleBytes >= 0);
 
+    long totalNumFiles = 0;
+    for (FeFsPartition part : inputParts) {
+      totalNumFiles += part.getNumFileDescriptors();
+    }
+
     // Conservative max size for Java arrays. The actual maximum varies
     // from JVM version and sometimes between configurations.
     final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10;
-    if (fileMetadataStats_.numFiles > JVM_MAX_ARRAY_SIZE) {
+    if (totalNumFiles > JVM_MAX_ARRAY_SIZE) {
       throw new IllegalStateException(String.format(
-          "Too many files to generate a table sample. " +
-          "Table '%s' has %s files, but a maximum of %s files are supported.",
-          getTableName().toString(), fileMetadataStats_.numFiles, JVM_MAX_ARRAY_SIZE));
+          "Too many files to generate a table sample of table %s. " +
+          "Sample requested over %s files, but a maximum of %s files are supported.",
+          getTableName().toString(), totalNumFiles, JVM_MAX_ARRAY_SIZE));
     }
-    int totalNumFiles = (int) fileMetadataStats_.numFiles;
 
     // Ensure a consistent ordering of files for repeatable runs. The files within a
     // partition are already ordered based on how they are loaded in the catalog.
@@ -2177,12 +2181,11 @@ public class HdfsTable extends Table implements FeFsTable {
     // fileIdxs contains indexes into the file descriptor lists of all inputParts
     // parts[i] contains the partition corresponding to fileIdxs[i]
     // fileIdxs[i] is an index into the file descriptor list of the partition parts[i]
-    // Use max size to avoid looping over inputParts for the exact size.
     // The purpose of these arrays is to efficiently avoid selecting the same file
     // multiple times during the sampling, regardless of the sample percent. We purposely
     // avoid generating objects proportional to the number of files.
-    int[] fileIdxs = new int[totalNumFiles];
-    FeFsPartition[] parts = new FeFsPartition[totalNumFiles];
+    int[] fileIdxs = new int[(int)totalNumFiles];
+    FeFsPartition[] parts = new FeFsPartition[(int)totalNumFiles];
     int idx = 0;
     long totalBytes = 0;
     for (FeFsPartition part: orderedParts) {
@@ -2194,6 +2197,9 @@ public class HdfsTable extends Table implements FeFsTable {
         ++idx;
       }
     }
+    if (idx != totalNumFiles) {
+      throw new AssertionError("partition file counts changed during iteration");
+    }
 
     int numFilesRemaining = idx;
     double fracPercentBytes = (double) percentBytes / 100;

http://git-wip-us.apache.org/repos/asf/impala/blob/effe4e66/tests/query_test/test_tablesample.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_tablesample.py b/tests/query_test/test_tablesample.py
index 4bc7e1f..8652382 100644
--- a/tests/query_test/test_tablesample.py
+++ b/tests/query_test/test_tablesample.py
@@ -32,6 +32,7 @@ class TestTableSample(ImpalaTestSuite):
   def add_test_dimensions(cls):
     super(TestTableSample, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('repeatable', *[True, False]))
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('filtered', *[True, False]))
     # Tablesample is only supported on HDFS tables.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
       v.get_value('table_format').file_format != 'kudu' and
@@ -48,15 +49,21 @@ class TestTableSample(ImpalaTestSuite):
     # 2. The results of queries without a repeatable clause could change due to
     # changes in data loading that affect the number or size of files.
     repeatable = vector.get_value('repeatable')
+    filtered = vector.get_value('filtered')
+
+    where_clause = ""
+    if filtered:
+      where_clause = "where month between 1 and 6"
+
     ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
-    result = self.client.execute("select count(*) from alltypes")
+    result = self.client.execute("select count(*) from alltypes %s" % where_clause)
     baseline_count = int(result.data[0])
     prev_count = None
-    for perc in [5, 20, 50]:
+    for perc in [5, 20, 50, 100]:
       rep_sql = ""
       if repeatable: rep_sql = " repeatable(1)"
-      sql_stmt = "select count(*) from alltypes tablesample system(%s)%s" \
-                 % (perc, rep_sql)
+      sql_stmt = "select count(*) from alltypes tablesample system(%s)%s %s" \
+                 % (perc, rep_sql, where_clause)
       handle = self.client.execute_async(sql_stmt)
       # IMPALA-6352: flaky test, possibly due to a hung thread. Wait for 500 sec before
       # failing and logging the backtraces of all impalads.
@@ -76,7 +83,10 @@ class TestTableSample(ImpalaTestSuite):
       result = self.client.fetch(sql_stmt, handle)
       self.client.close_query(handle)
       count = int(result.data[0])
-      assert count < baseline_count
+      if perc < 100:
+        assert count < baseline_count
+      else:
+        assert count == baseline_count
       if prev_count and repeatable:
         # May not necessarily be true for non-repeatable samples
         assert count > prev_count


[2/3] impala git commit: cleanup: extract RowBatchQueue into its own file

Posted by to...@apache.org.
cleanup: extract RowBatchQueue into its own file

While looking at IMPALA-7096, I noticed that RowBatchQueue was
implemented in a strange place.

Change-Id: I3577c1c6920b8cf858c8d49f8812ccc305d833f6
Reviewed-on: http://gerrit.cloudera.org:8080/10943
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c845aab8
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c845aab8
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c845aab8

Branch: refs/heads/master
Commit: c845aab86ea8ffb27ffb8d2f577f7a71e7e46120
Parents: 3f393bc
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jul 12 23:10:38 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 17 22:06:54 2018 +0000

----------------------------------------------------------------------
 be/src/exec/blocking-join-node.h         |  1 +
 be/src/exec/exec-node.cc                 | 31 -------------
 be/src/exec/exec-node.h                  | 35 ---------------
 be/src/exec/hdfs-scan-node.cc            |  1 +
 be/src/exec/kudu-scan-node.cc            |  1 +
 be/src/exec/scan-node.cc                 |  1 +
 be/src/exec/scan-node.h                  |  1 +
 be/src/exec/scanner-context.cc           |  1 +
 be/src/runtime/CMakeLists.txt            |  1 +
 be/src/runtime/data-stream-recvr.cc      |  1 +
 be/src/runtime/krpc-data-stream-recvr.cc |  1 +
 be/src/runtime/row-batch-queue.cc        | 55 +++++++++++++++++++++++
 be/src/runtime/row-batch-queue.h         | 65 +++++++++++++++++++++++++++
 13 files changed, 129 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/blocking-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index b7dd79a..8198ad0 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -25,6 +25,7 @@
 
 #include "exec/exec-node.h"
 #include "util/promise.h"
+#include "util/stopwatch.h"
 
 #include "gen-cpp/PlanNodes_types.h"  // for TJoinOp
 

http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 384d65d..766f421 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -77,37 +77,6 @@ int ExecNode::GetNodeIdFromProfile(RuntimeProfile* p) {
   return p->metadata();
 }
 
-ExecNode::RowBatchQueue::RowBatchQueue(int max_batches)
-  : BlockingQueue<unique_ptr<RowBatch>>(max_batches) {
-}
-
-ExecNode::RowBatchQueue::~RowBatchQueue() {
-  DCHECK(cleanup_queue_.empty());
-}
-
-void ExecNode::RowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
-  if (!BlockingPut(move(batch))) {
-    lock_guard<SpinLock> l(lock_);
-    cleanup_queue_.push_back(move(batch));
-  }
-}
-
-unique_ptr<RowBatch> ExecNode::RowBatchQueue::GetBatch() {
-  unique_ptr<RowBatch> result;
-  if (BlockingGet(&result)) return result;
-  return unique_ptr<RowBatch>();
-}
-
-void ExecNode::RowBatchQueue::Cleanup() {
-  unique_ptr<RowBatch> batch = NULL;
-  while ((batch = GetBatch()) != NULL) {
-    batch.reset();
-  }
-
-  lock_guard<SpinLock> l(lock_);
-  cleanup_queue_.clear();
-}
-
 ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
   : id_(tnode.node_id),
     type_(tnode.node_type),

http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 9a87a56..a62ed6c 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -30,7 +30,6 @@
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/descriptors.h" // for RowDescriptor
 #include "runtime/reservation-manager.h"
-#include "util/blocking-queue.h"
 #include "util/runtime-profile.h"
 
 namespace impala {
@@ -243,40 +242,6 @@ class ExecNode {
     return reservation_manager_.ReleaseUnusedReservation();
   }
 
-  /// Extends blocking queue for row batches. Row batches have a property that
-  /// they must be processed in the order they were produced, even in cancellation
-  /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
-  /// and we need to make sure those ptrs stay valid.
-  /// Row batches that are added after Shutdown() are queued in another queue, which can
-  /// be cleaned up during Close().
-  /// All functions are thread safe.
-  class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>> {
-   public:
-    /// max_batches is the maximum number of row batches that can be queued.
-    /// When the queue is full, producers will block.
-    RowBatchQueue(int max_batches);
-    ~RowBatchQueue();
-
-    /// Adds a batch to the queue. This is blocking if the queue is full.
-    void AddBatch(std::unique_ptr<RowBatch> batch);
-
-    /// Gets a row batch from the queue. Returns NULL if there are no more.
-    /// This function blocks.
-    /// Returns NULL after Shutdown().
-    std::unique_ptr<RowBatch> GetBatch();
-
-    /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch()
-    /// after this is called.
-    void Cleanup();
-
-   private:
-    /// Lock protecting cleanup_queue_
-    SpinLock lock_;
-
-    /// Queue of orphaned row batches
-    std::list<std::unique_ptr<RowBatch>> cleanup_queue_;
-  };
-
   /// Unique within a single plan tree.
   int id_;
   TPlanNodeType::type type_;

http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 1ceaf2c..5d4f9b0 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -31,6 +31,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/thread-resource-mgr.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 48816f9..30194f9 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -27,6 +27,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/thread-resource-mgr.h"
 #include "runtime/tuple-row.h"
 #include "util/runtime-profile-counters.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 01aa269..4d59eed 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -22,6 +22,7 @@
 #include "exprs/scalar-expr.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
 #include "util/disk-info.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 63bb59b..1d0728c 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -28,6 +28,7 @@
 
 namespace impala {
 
+class RowBatchQueue;
 class TScanRange;
 
 /// Abstract base class of all scan nodes. Subclasses support different storage layers

http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 75aacee..a9fad6a 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -26,6 +26,7 @@
 #include "runtime/exec-env.h"
 #include "runtime/mem-pool.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-buffer.h"
 #include "util/debug-util.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 2fea5bd..e09b27c 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -61,6 +61,7 @@ add_library(Runtime
   reservation-manager.cc
   row-batch.cc
   ${ROW_BATCH_PROTO_SRCS}
+  row-batch-queue.cc
   runtime-filter.cc
   runtime-filter-bank.cc
   runtime-filter-ir.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
index 8d9047f..c9a9ab9 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -22,6 +22,7 @@
 #include "runtime/data-stream-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/sorted-run-merger.h"
 #include "util/condition-variable.h"
 #include "util/runtime-profile-counters.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index be51f32..3933e02 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -31,6 +31,7 @@
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/row-batch-queue.h"
 #include "runtime/sorted-run-merger.h"
 #include "service/data-stream-service.h"
 #include "util/runtime-profile-counters.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/runtime/row-batch-queue.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-queue.cc b/be/src/runtime/row-batch-queue.cc
new file mode 100644
index 0000000..1fd5555
--- /dev/null
+++ b/be/src/runtime/row-batch-queue.cc
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/row-batch-queue.h"
+
+#include "runtime/row-batch.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+RowBatchQueue::RowBatchQueue(int max_batches)
+  : BlockingQueue<unique_ptr<RowBatch>>(max_batches) {}
+
+RowBatchQueue::~RowBatchQueue() {
+  DCHECK(cleanup_queue_.empty());
+}
+
+void RowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
+  if (!BlockingPut(move(batch))) {
+    lock_guard<SpinLock> l(lock_);
+    cleanup_queue_.push_back(move(batch));
+  }
+}
+
+unique_ptr<RowBatch> RowBatchQueue::GetBatch() {
+  unique_ptr<RowBatch> result;
+  if (BlockingGet(&result)) return result;
+  return unique_ptr<RowBatch>();
+}
+
+void RowBatchQueue::Cleanup() {
+  unique_ptr<RowBatch> batch = nullptr;
+  while ((batch = GetBatch()) != nullptr) {
+    batch.reset();
+  }
+
+  lock_guard<SpinLock> l(lock_);
+  cleanup_queue_.clear();
+}
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/c845aab8/be/src/runtime/row-batch-queue.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-queue.h b/be/src/runtime/row-batch-queue.h
new file mode 100644
index 0000000..bd2f551
--- /dev/null
+++ b/be/src/runtime/row-batch-queue.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_BLOCKING_QUEUE_H
+#define IMPALA_RUNTIME_BLOCKING_QUEUE_H
+
+#include <list>
+#include <memory>
+
+#include "util/blocking-queue.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class RowBatch;
+
+/// Extends blocking queue for row batches. Row batches have a property that
+/// they must be processed in the order they were produced, even in cancellation
+/// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
+/// and we need to make sure those ptrs stay valid.
+/// Row batches that are added after Shutdown() are queued in a separate "cleanup"
+/// queue, which can be cleaned up during Close().
+/// All functions are thread safe.
+class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>> {
+ public:
+  /// max_batches is the maximum number of row batches that can be queued.
+  /// When the queue is full, producers will block.
+  RowBatchQueue(int max_batches);
+  ~RowBatchQueue();
+
+  /// Adds a batch to the queue. This is blocking if the queue is full.
+  void AddBatch(std::unique_ptr<RowBatch> batch);
+
+  /// Gets a row batch from the queue. Returns NULL if there are no more.
+  /// This function blocks.
+  /// Returns NULL after Shutdown().
+  std::unique_ptr<RowBatch> GetBatch();
+
+  /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch()
+  /// after this is called.
+  void Cleanup();
+
+ private:
+  /// Lock protecting cleanup_queue_
+  SpinLock lock_;
+
+  /// Queue of orphaned row batches
+  std::list<std::unique_ptr<RowBatch>> cleanup_queue_;
+};
+}
+#endif