You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/31 06:48:27 UTC

[1/5] impala git commit: IMPALA-7209: Disallow self referencing in ALTER VIEW statements

Repository: impala
Updated Branches:
  refs/heads/master 10a67509f -> 316b17ac5


IMPALA-7209: Disallow self referencing in ALTER VIEW statements

Previously, ALTER VIEW queries did not carry out reference checks
in the analysis phase. This allowed the DDL operation to succeed
but subsequent queries to the view threw StackOverflowError
because the catalog was unable to resolve the reference. With this
change, the AlterViewStmt checks for direct and in-direct self
references before altering a view.

Testing: Added tests to AnalyzeDDLTest to verify it.
Change-Id: I17c231c9d74d9d411463a408b086eb874090b9b7
Reviewed-on: http://gerrit.cloudera.org:8080/10908
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9146f73a
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9146f73a
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9146f73a

Branch: refs/heads/master
Commit: 9146f73a58c645cd861006cd6c90d9cbf08feeec
Parents: 10a6750
Author: poojanilangekar <po...@cloudera.com>
Authored: Mon Jul 9 16:43:42 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 02:11:52 2018 +0000

----------------------------------------------------------------------
 .../apache/impala/analysis/AlterViewStmt.java   | 11 ++++
 .../org/apache/impala/analysis/QueryStmt.java   | 14 +++++
 .../org/apache/impala/analysis/SelectStmt.java  | 35 +++++++++++++
 .../org/apache/impala/analysis/UnionStmt.java   | 10 ++++
 .../apache/impala/analysis/AnalyzeDDLTest.java  | 54 +++++++++++++++++++-
 5 files changed, 123 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9146f73a/fe/src/main/java/org/apache/impala/analysis/AlterViewStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterViewStmt.java
index 910dd98..19b9fd0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterViewStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterViewStmt.java
@@ -18,6 +18,7 @@
 package org.apache.impala.analysis;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeTable;
@@ -29,6 +30,7 @@ import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.service.BackendConfig;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 
 /**
  * Represents an ALTER VIEW AS statement.
@@ -58,6 +60,15 @@ public class AlterViewStmt extends CreateOrAlterViewStmtBase {
     analyzer.addAccessEvent(new TAccessEvent(dbName_ + "." + tableName_.getTbl(),
         TCatalogObjectType.VIEW, Privilege.ALTER.toString()));
 
+    // viewDefStmt_ should not contain any references to the view being altered.
+    Set<FeView> inlineViews = Sets.newHashSet();
+    viewDefStmt_.collectInlineViews(inlineViews);
+    TableRef tblRef = analyzer.resolveTableRef(new TableRef(tableName_.toPath(), null));
+    if (inlineViews.contains(((InlineViewRef) tblRef).getView())) {
+      throw new AnalysisException(
+          String.format("Self-reference not allowed on view: %s", tblRef.toSql()));
+    }
+
     createColumnAndViewDefs(analyzer);
     if (BackendConfig.INSTANCE.getComputeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
       computeLineageGraph(analyzer);

http://git-wip-us.apache.org/repos/asf/impala/blob/9146f73a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
index 6ad1c8a..a561cbd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.impala.catalog.FeView;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
 import org.apache.impala.common.AnalysisException;
@@ -131,6 +132,19 @@ public abstract class QueryStmt extends StatementBase {
     }
   }
 
+  /**
+  * Returns all inline view references in this statement.
+  */
+  public void collectInlineViews(Set<FeView> inlineViews) {
+    if (withClause_ != null) {
+      List<? extends FeView> withClauseViews = withClause_.getViews();
+      for (FeView withView : withClauseViews) {
+        inlineViews.add(withView);
+        withView.getQueryStmt().collectInlineViews(inlineViews);
+      }
+    }
+  }
+
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
     if (isAnalyzed()) return;

http://git-wip-us.apache.org/repos/asf/impala/blob/9146f73a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index 6156f0b..07a5cd7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.apache.impala.analysis.Path.PathType;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.FeView;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.TableLoadingException;
@@ -144,6 +145,20 @@ public class SelectStmt extends QueryStmt {
   }
 
   /**
+   * @return the QueryStmt present in the whereClause_ if present, null otherwise.
+   */
+  private QueryStmt getWhereSubQueryStmt() {
+    QueryStmt whereQueryStmt = null;
+    if (whereClause_ != null) {
+      Subquery whereSubquery = whereClause_.getSubquery();
+      if (whereSubquery != null) {
+        whereQueryStmt = whereSubquery.getStatement();
+      }
+    }
+    return whereQueryStmt;
+  }
+
+  /**
    * Creates resultExprs and baseTblResultExprs.
    */
   @Override
@@ -1060,6 +1075,26 @@ public class SelectStmt extends QueryStmt {
   }
 
   @Override
+  public void collectInlineViews(Set<FeView> inlineViews) {
+    // Impala currently supports sub queries only in FROM, WHERE & WITH clauses. Hence,
+    // this function does not carry out any checks on HAVING clause.
+    super.collectInlineViews(inlineViews);
+    List<TableRef> fromTblRefs = getTableRefs();
+    Preconditions.checkNotNull(inlineViews);
+    for (TableRef fromTblRef : fromTblRefs) {
+      if (fromTblRef instanceof InlineViewRef) {
+        InlineViewRef inlineViewRef = (InlineViewRef) fromTblRef;
+        inlineViews.add(inlineViewRef.getView());
+        inlineViewRef.getViewStmt().collectInlineViews(inlineViews);
+      }
+    }
+    QueryStmt whereStmt = getWhereSubQueryStmt();
+    if (whereStmt != null) {
+      whereStmt.collectInlineViews(inlineViews);
+    }
+  }
+
+  @Override
   public void reset() {
     super.reset();
     selectList_.reset();

http://git-wip-us.apache.org/repos/asf/impala/blob/9146f73a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
index 313479e..37500bf 100644
--- a/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/UnionStmt.java
@@ -19,8 +19,10 @@ package org.apache.impala.analysis;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.impala.catalog.ColumnStats;
+import org.apache.impala.catalog.FeView;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.rewrite.ExprRewriter;
 import org.slf4j.Logger;
@@ -551,6 +553,14 @@ public class UnionStmt extends QueryStmt {
   }
 
   @Override
+  public void collectInlineViews(Set<FeView> inlineViews) {
+    super.collectInlineViews(inlineViews);
+    for (UnionOperand operand : operands_) {
+      operand.getQueryStmt().collectInlineViews(inlineViews);
+    }
+  }
+
+  @Override
   public String toSql(boolean rewritten) {
     if (!rewritten && toSqlString_ != null) return toSqlString_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/9146f73a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 46b3787..2c40545 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -1086,7 +1086,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
         "select * from functional.alltypesagg");
     // View-definition references a view.
     AnalyzesOk("alter view functional.alltypes_view as " +
-        "select * from functional.alltypes_view");
+        "select * from functional.alltypes_view_sub");
     // Change column definitions.
     AnalyzesOk("alter view functional.alltypes_view (a, b) as " +
         "select int_col, string_col from functional.alltypes");
@@ -1164,6 +1164,58 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalysisError("alter view functional.alltypes_view (a, b, a) as " +
         "select int_col, int_col, int_col from functional.alltypes",
         "Duplicate column name: a");
+
+    // Self-referncing view in view definition - SELECT.
+    AnalysisError("alter view functional.alltypes_view as " +
+        "select * from functional.alltypes_view",
+        "Self-reference not allowed on view: functional.alltypes_view");
+    AnalysisError("alter view functional.alltypes_view (a, b, c) as " +
+        "select smallint_col, int_col, bigint_col from functional.alltypes_view",
+        "Self-reference not allowed on view: functional.alltypes_view");
+    // Self-referencing view in view definition - UNION.
+    AnalysisError("alter view functional.alltypes_view as " +
+        "select * from functional.alltypes union all " +
+        "select * from functional.alltypes_view",
+        "Self-reference not allowed on view: functional.alltypes_view");
+    AnalysisError("alter view functional.alltypes_view as " +
+        "select * from functional.alltypes union distinct " +
+        "select * from functional.alltypes_view",
+        "Self-reference not allowed on view: functional.alltypes_view");
+    // Self-referencing view via a dependent view.
+    AnalysisError("alter view functional.alltypes_view as " +
+        "select * from functional.view_view",
+        "Self-reference not allowed on view: functional.alltypes_view");
+    AnalysisError("alter view functional.alltypes_view (a, b) as " +
+        "select smallint_col, int_col from functional.view_view",
+        "Self-reference not allowed on view: functional.alltypes_view");
+    // Self-referencing view in where caluse.
+    AnalysisError("alter view functional.alltypes_view as " +
+        "select * from functional.alltypes " +
+        "where id > (select sum(tinyint_col) from functional.alltypes_view)",
+        "Self-reference not allowed on view: functional.alltypes_view");
+    // Self-referencing view in with clause.
+    AnalysisError("alter view functional.alltypes_view as " +
+        "with temp_view(col_a, col_b, col_c) as " +
+        "(select tinyint_col, int_col, bigint_col from functional.alltypes_view) " +
+        "select * from temp_view",
+        "Self-reference not allowed on view: functional.alltypes_view");
+    AnalysisError("alter view functional.alltypes_view as " +
+        "with temp_view(col_a, col_b, col_c) as " +
+        "(select tinyint_col, int_col, bigint_col from functional.alltypes_view) " +
+        "select * from functional.alltypes",
+        "Self-reference not allowed on view: functional.alltypes_view");
+    AnalysisError("alter view functional.alltypes_view as " +
+        "with temp_view(col_a, col_b, col_c) as " +
+        "(select tinyint_col, int_col, bigint_col from functional.alltypes_view) " +
+        "select * from temp_view union all " +
+        "select tinyint_col, int_col, bigint_col from functional.alltypes",
+        "Self-reference not allowed on view: functional.alltypes_view");
+    AnalysisError("alter view functional.alltypes_view as " +
+        "with temp_view(col_a, col_b, col_c) as " +
+        "(select tinyint_col, int_col, bigint_col from functional.alltypes_view) " +
+        "select * from temp_view union distinct " +
+        "select tinyint_col, int_col, bigint_col from functional.alltypes",
+        "Self-reference not allowed on view: functional.alltypes_view");
   }
 
   @Test


[2/5] impala git commit: IMPALA-7333: remove MarkNeedsDeepCopy() in agg and BTS

Posted by ta...@apache.org.
IMPALA-7333: remove MarkNeedsDeepCopy() in agg and BTS

This takes advantage of work (e.g. IMPALA-3200, IMPALA-5844)
to remove a couple of uses of the API.

Testing:
Ran core, ASAN and exhaustive builds.

Added unit tests to directly test the attaching behaviour.

Change-Id: I91ac53bacc00df4726c015a30ba5a2026aa4b5f5
Reviewed-on: http://gerrit.cloudera.org:8080/11007
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/240fde62
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/240fde62
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/240fde62

Branch: refs/heads/master
Commit: 240fde62d532c7166fc613a97b38c199cec09f1f
Parents: 9146f73
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jul 19 11:20:40 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 02:25:27 2018 +0000

----------------------------------------------------------------------
 be/src/exec/grouping-aggregator.cc           |   8 +-
 be/src/runtime/buffered-tuple-stream-test.cc | 249 ++++++++++++++++++++--
 be/src/runtime/buffered-tuple-stream.cc      | 145 ++++++++-----
 be/src/runtime/buffered-tuple-stream.h       | 138 +++++++-----
 be/src/runtime/row-batch.h                   |   1 +
 be/src/runtime/tuple.h                       |   4 +
 6 files changed, 408 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/240fde62/be/src/exec/grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 092ecfd..42d6be8 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -294,8 +294,12 @@ Status GroupingAggregator::GetRowsFromPartition(
 
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   partition_eos_ = ReachedLimit();
-  if (output_iterator_.AtEnd()) row_batch->MarkNeedsDeepCopy();
-
+  if (partition_eos_ || output_iterator_.AtEnd()) {
+    // Attach all buffers referenced by previously-returned rows. On the next GetNext()
+    // call we will close the partition.
+    output_partition_->aggregated_row_stream->Close(
+        row_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/240fde62/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index ef66824..6ff9805 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -398,6 +398,22 @@ class SimpleTupleStreamTest : public testing::Test {
 
   void TestTransferMemory(bool pinned_stream, bool read_write);
 
+  void TestAttachMemory(bool pinned_stream, bool attach_on_read);
+
+  void TestFlushResourcesReadWrite(bool pinned_stream, bool attach_on_read);
+
+  /// Helper for TestFlushResourcesReadWrite() to write and read back rows from
+  /// *stream. 'append_batch_size' is the number of rows to append at a time before
+  /// reading them back. *num_buffers_attached tracks the number of buffers attached
+  /// to the output batch.
+  void AppendToReadWriteStream(int64_t append_batch_size, int64_t buffer_size,
+      int* num_buffers_attached, BufferedTupleStream* stream);
+
+  // Helper for AppendToReadWriteStream() to verify 'out_batch' contents. The value of
+  // row i of 'out_batch' is expected to be the same as the row at index
+  // (i + start_index) % out_batch->num_rows() of 'in_batch'.
+  void VerifyReadWriteBatch(RowBatch* in_batch, RowBatch* out_batch, int64_t start_index);
+
   // Helper to writes 'row' comprised of only string slots to 'data'. The expected
   // length of the data written is 'expected_len'.
   void WriteStringRow(const RowDescriptor* row_desc, TupleRow* row, int64_t fixed_size,
@@ -649,13 +665,13 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
   ASSERT_TRUE(pinned);
 
   // Read and verify result a few times. We should be able to reread the stream if
-  // we don't use delete on read mode.
+  // we don't use attach on read mode.
   int read_iters = 3;
   for (int i = 0; i < read_iters; ++i) {
-    bool delete_on_read = i == read_iters - 1;
+    bool attach_on_read = i == read_iters - 1;
     if (i > 0 || !read_write) {
       bool got_read_reservation;
-      ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
+      ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
       ASSERT_TRUE(got_read_reservation);
     }
 
@@ -670,15 +686,13 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
     }
   }
 
-  // After delete_on_read, all blocks aside from the last should be deleted.
-  // Note: this should really be 0, but the BufferedTupleStream returns eos before
-  // deleting the last block, rather than after, so the last block isn't deleted
-  // until the stream is closed.
-  ASSERT_EQ(stream.BytesPinned(false), buffer_size);
+  // After attach_on_read, all buffers should have been attached to the output batches
+  // on previous GetNext() calls.
+  ASSERT_EQ(0, stream.BytesPinned(false));
 
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 
-  ASSERT_EQ(stream.BytesPinned(false), 0);
+  ASSERT_EQ(0, stream.BytesPinned(false));
 }
 
 TEST_F(SimpleTupleStreamTest, UnpinPin) {
@@ -765,6 +779,205 @@ TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamNoReadWrite) {
   TestTransferMemory(false, false);
 }
 
+/// Test iteration over a stream with and without attaching memory.
+void SimpleTupleStreamTest::TestAttachMemory(bool pin_stream, bool attach_on_read) {
+  // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to
+  // make the batch at capacity.
+  int buffer_size = 4 * 1024;
+  Init(100 * buffer_size);
+
+  BufferedTupleStream stream(
+      runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
+  ASSERT_OK(stream.Init(-1, pin_stream));
+  bool got_write_reservation;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
+  ASSERT_TRUE(got_write_reservation);
+  RowBatch* in_batch = CreateIntBatch(0, 1024, false);
+
+  // Construct a stream with 4 pages.
+  const int total_num_pages = 4;
+  while (stream.byte_size() < total_num_pages * buffer_size) {
+    Status status;
+    for (int i = 0; i < in_batch->num_rows(); ++i) {
+      bool ret = stream.AddRow(in_batch->GetRow(i), &status);
+      EXPECT_TRUE(ret);
+      ASSERT_OK(status);
+    }
+  }
+
+  RowBatch* out_batch = pool_.Add(new RowBatch(int_desc_, 100, &tracker_));
+  int num_buffers_attached = 0;
+  int num_flushes = 0;
+  int64_t num_rows_returned = 0;
+  bool got_read_reservation;
+  ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
+  ASSERT_TRUE(got_read_reservation);
+  bool eos;
+  do {
+    ASSERT_EQ(0, out_batch->num_buffers());
+    ASSERT_OK(stream.GetNext(out_batch, &eos));
+    EXPECT_LE(out_batch->num_buffers(), 1) << "Should only attach one buffer at a time";
+    if (out_batch->num_buffers() > 0) {
+      EXPECT_TRUE(out_batch->AtCapacity()) << "Flush resources flag should have been set";
+    }
+    num_buffers_attached += out_batch->num_buffers();
+    for (int i = 0; i < out_batch->num_rows(); ++i) {
+      int slot_offset = int_desc_->tuple_descriptors()[0]->slots()[0]->tuple_offset();
+      TupleRow* in_row = in_batch->GetRow(num_rows_returned % in_batch->num_rows());
+      EXPECT_EQ(*in_row->GetTuple(0)->GetIntSlot(slot_offset),
+          *out_batch->GetRow(i)->GetTuple(0)->GetIntSlot(slot_offset));
+      ++num_rows_returned;
+    }
+    num_flushes += out_batch->flush_mode() == RowBatch::FlushMode::FLUSH_RESOURCES;
+    out_batch->Reset();
+  } while (!eos);
+
+  if (attach_on_read) {
+    EXPECT_EQ(4, num_buffers_attached) << "All buffers attached during iteration.";
+  } else {
+    EXPECT_EQ(0, num_buffers_attached) << "No buffers attached during iteration.";
+  }
+  if (attach_on_read || !pin_stream) EXPECT_EQ(4, num_flushes);
+  out_batch->Reset();
+  stream.Close(out_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+  if (attach_on_read) {
+    EXPECT_EQ(0, out_batch->num_buffers());
+  } else if (pin_stream) {
+    // All buffers should be attached.
+    EXPECT_EQ(4, out_batch->num_buffers());
+  } else {
+    // Buffer from last pinned page should be attached.
+    EXPECT_EQ(1, out_batch->num_buffers());
+  }
+  in_batch->Reset();
+  out_batch->Reset();
+}
+
+TEST_F(SimpleTupleStreamTest, TestAttachMemoryPinned) {
+  TestAttachMemory(true, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TestNoAttachMemoryPinned) {
+  TestAttachMemory(true, false);
+}
+
+TEST_F(SimpleTupleStreamTest, TestAttachMemoryUnpinned) {
+  TestAttachMemory(false, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TestNoAttachMemoryUnpinned) {
+  TestAttachMemory(false, false);
+}
+
+// Test for advancing the read/write page with resource flushing.
+void SimpleTupleStreamTest::TestFlushResourcesReadWrite(
+    bool pin_stream, bool attach_on_read) {
+  // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to
+  // make the batch at capacity.
+  const int BUFFER_SIZE = 512;
+  const int BATCH_SIZE = 100;
+  // For unpinned streams, we should be able to iterate with only two buffers.
+  const int MAX_PINNED_PAGES = pin_stream ? 1000 : 2;
+  Init(MAX_PINNED_PAGES * BUFFER_SIZE);
+
+  BufferedTupleStream stream(
+      runtime_state_, int_desc_, &client_, BUFFER_SIZE, BUFFER_SIZE);
+  ASSERT_OK(stream.Init(-1, pin_stream));
+  bool got_reservation;
+  ASSERT_OK(stream.PrepareForReadWrite(attach_on_read, &got_reservation));
+  ASSERT_TRUE(got_reservation);
+  int num_buffers_attached = 0;
+  /// Read over the page in different increments.
+  for (int append_batch_size : {1, 10, 100, 1000}) {
+    AppendToReadWriteStream(
+        append_batch_size, BUFFER_SIZE, &num_buffers_attached, &stream);
+  }
+
+  if (attach_on_read) {
+    EXPECT_EQ(stream.byte_size() / BUFFER_SIZE - 1, num_buffers_attached)
+        << "All buffers except the current write page should have been attached";
+  } else {
+    EXPECT_EQ(0, num_buffers_attached);
+  }
+
+  RowBatch* final_out_batch = pool_.Add(new RowBatch(int_desc_, BATCH_SIZE, &tracker_));
+  stream.Close(final_out_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+  final_out_batch->Reset();
+}
+
+void SimpleTupleStreamTest::AppendToReadWriteStream(int64_t append_batch_size,
+    int64_t buffer_size, int* num_buffers_attached, BufferedTupleStream* stream) {
+  RowBatch* in_batch = CreateIntBatch(0, BATCH_SIZE, false);
+
+  /// Accumulate row batches until we see a flush. The contents of the batches should
+  /// remain valid until reset or delete trailing batches.
+  vector<unique_ptr<RowBatch>> out_batches;
+  // The start row index of each batch in 'out_batches'.
+  vector<int64_t> out_batch_start_indices;
+  // Iterate over at least 10 pages.
+  int64_t start_byte_size = stream->byte_size();
+  while (stream->byte_size() - start_byte_size < 10 * buffer_size) {
+    Status status;
+    for (int i = 0; i < append_batch_size; ++i) {
+      bool ret = stream->AddRow(
+          in_batch->GetRow(stream->num_rows() % in_batch->num_rows()), &status);
+      EXPECT_TRUE(ret);
+      ASSERT_OK(status);
+    }
+    int64_t rows_read = 0;
+    bool eos;
+    while (rows_read < append_batch_size) {
+      out_batches.emplace_back(new RowBatch(int_desc_, BATCH_SIZE, &tracker_));
+      out_batch_start_indices.push_back(stream->rows_returned());
+      ASSERT_OK(stream->GetNext(out_batches.back().get(), &eos));
+      // Verify the contents of all valid batches to make sure that they haven't become
+      // invalid.
+      LOG(INFO) << "Verifying " << out_batches.size() << " batches";
+      for (int i = 0; i < out_batches.size(); ++i) {
+        VerifyReadWriteBatch(in_batch, out_batches[i].get(), out_batch_start_indices[i]);
+      }
+      *num_buffers_attached += out_batches.back()->num_buffers();
+      rows_read += out_batches.back()->num_rows();
+      EXPECT_EQ(rows_read == append_batch_size, eos);
+      if (out_batches.back().get()->flush_mode()
+          == RowBatch::FlushMode::FLUSH_RESOURCES) {
+        out_batches.clear();
+        out_batch_start_indices.clear();
+      }
+    }
+    EXPECT_EQ(append_batch_size, rows_read);
+    EXPECT_EQ(true, eos);
+  }
+  in_batch->Reset();
+}
+
+void SimpleTupleStreamTest::VerifyReadWriteBatch(
+    RowBatch* in_batch, RowBatch* out_batch, int64_t start_index) {
+  int slot_offset = int_desc_->tuple_descriptors()[0]->slots()[0]->tuple_offset();
+  int64_t row_index = start_index;
+  for (int i = 0; i < out_batch->num_rows(); ++i) {
+    TupleRow* in_row = in_batch->GetRow(row_index++ % in_batch->num_rows());
+    EXPECT_EQ(*in_row->GetTuple(0)->GetIntSlot(slot_offset),
+        *out_batch->GetRow(i)->GetTuple(0)->GetIntSlot(slot_offset));
+  }
+}
+
+TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWritePinnedAttach) {
+  TestFlushResourcesReadWrite(true, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWritePinnedNoAttach) {
+  TestFlushResourcesReadWrite(true, false);
+}
+
+TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWriteUnpinnedAttach) {
+  TestFlushResourcesReadWrite(false, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWriteUnpinnedNoAttach) {
+  TestFlushResourcesReadWrite(false, false);
+}
+
 // Test that tuple stream functions if it references strings outside stream. The
 // aggregation node relies on this since it updates tuples in-place.
 TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
@@ -805,11 +1018,11 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
 
   DCHECK_EQ(rows_added, stream.num_rows());
 
-  for (int delete_on_read = 0; delete_on_read <= 1; ++delete_on_read) {
+  for (int attach_on_read = 0; attach_on_read <= 1; ++attach_on_read) {
     // Keep stream in memory and test we can read ok.
     vector<StringValue> results;
     bool got_read_reservation;
-    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
+    ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
     ASSERT_TRUE(got_read_reservation);
     ReadValues(&stream, string_desc_, &results);
     VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
@@ -934,8 +1147,8 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
   vector<uint8_t> tuple_mem(tuple_desc->byte_size());
   Tuple* write_tuple = reinterpret_cast<Tuple*>(tuple_mem.data());
   write_row->SetTuple(0, write_tuple);
-  StringValue* write_str = reinterpret_cast<StringValue*>(
-      write_tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
+  StringValue* write_str =
+      write_tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
   // Make the string large enough to fill a page.
   const int64_t string_len = BIG_ROW_BYTES - tuple_desc->byte_size();
   vector<char> data(string_len);
@@ -961,8 +1174,7 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
     EXPECT_EQ(1, read_batch.num_rows());
     EXPECT_TRUE(eos);
     Tuple* tuple = read_batch.GetRow(0)->GetTuple(0);
-    StringValue* str = reinterpret_cast<StringValue*>(
-        tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
+    StringValue* str = tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
     EXPECT_EQ(string_len, str->len);
     for (int j = 0; j < string_len; ++j) {
       EXPECT_EQ(i, str->ptr[j]) << j;
@@ -988,8 +1200,7 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
     EXPECT_EQ(1, read_batch.num_rows());
     EXPECT_EQ(eos, i == MAX_BUFFERS) << i;
     Tuple* tuple = read_batch.GetRow(0)->GetTuple(0);
-    StringValue* str = reinterpret_cast<StringValue*>(
-        tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
+    StringValue* str = tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
     EXPECT_EQ(string_len, str->len);
     for (int j = 0; j < string_len; ++j) {
       ASSERT_EQ(i, str->ptr[j]) << j;
@@ -1117,10 +1328,10 @@ TEST_F(MultiTupleStreamTest, MultiTupleAddRowCustom) {
   }
 
   for (int i = 0; i < 3; ++i) {
-    bool delete_on_read = i == 2;
+    bool attach_on_read = i == 2;
     vector<StringValue> results;
     bool got_read_reservation;
-    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
+    ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
     ASSERT_TRUE(got_read_reservation);
     ReadValues(&stream, string_desc_, &results);
     VerifyResults<StringValue>(*string_desc_, results, rows_added, false);

http://git-wip-us.apache.org/repos/asf/impala/blob/240fde62/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 9326507..71175d1 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -47,36 +47,20 @@ using namespace impala;
 using namespace strings;
 
 using BufferHandle = BufferPool::BufferHandle;
+using FlushMode = RowBatch::FlushMode;
 
 BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
     const RowDescriptor* row_desc, BufferPool::ClientHandle* buffer_pool_client,
     int64_t default_page_len, int64_t max_page_len, const set<SlotId>& ext_varlen_slots)
   : state_(state),
     desc_(row_desc),
-    node_id_(-1),
     buffer_pool_(state->exec_env()->buffer_pool()),
     buffer_pool_client_(buffer_pool_client),
-    num_pages_(0),
-    total_byte_size_(0),
-    has_read_iterator_(false),
     read_page_reservation_(buffer_pool_client_),
-    read_page_rows_returned_(-1),
-    read_ptr_(nullptr),
-    read_end_ptr_(nullptr),
-    write_ptr_(nullptr),
-    write_end_ptr_(nullptr),
-    rows_returned_(0),
-    has_write_iterator_(false),
-    write_page_(nullptr),
     write_page_reservation_(buffer_pool_client_),
-    bytes_pinned_(0),
-    num_rows_(0),
     default_page_len_(default_page_len),
     max_page_len_(max_page_len),
-    has_nullable_tuple_(row_desc->IsAnyTupleNullable()),
-    delete_on_read_(false),
-    closed_(false),
-    pinned_(true) {
+    has_nullable_tuple_(row_desc->IsAnyTupleNullable()) {
   DCHECK_GE(max_page_len, default_page_len);
   DCHECK(BitUtil::IsPowerOf2(default_page_len)) << default_page_len;
   DCHECK(BitUtil::IsPowerOf2(max_page_len)) << max_page_len;
@@ -110,10 +94,6 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
   }
 }
 
-BufferedTupleStream::~BufferedTupleStream() {
-  DCHECK(closed_);
-}
-
 void BufferedTupleStream::CheckConsistencyFull() const {
   CheckConsistencyFast();
   // The below checks require iterating over all the pages in the stream.
@@ -139,11 +119,13 @@ void BufferedTupleStream::CheckConsistencyFast() const {
   DCHECK(has_read_iterator() || read_page_ == pages_.end());
   if (read_page_ != pages_.end()) {
     CheckPageConsistency(&*read_page_);
-    DCHECK(read_page_->is_pinned());
-    DCHECK(read_page_->retrieved_buffer);
-    // Can't check read buffer without affecting behaviour, because a read may be in
-    // flight and this would required blocking on that write.
-    DCHECK_GE(read_end_ptr_, read_ptr_);
+    if (!read_page_->attached_to_output_batch) {
+      DCHECK(read_page_->is_pinned());
+      DCHECK(read_page_->retrieved_buffer);
+      // Can't check read buffer without affecting behaviour, because a read may be in
+      // flight and this would required blocking on that write.
+      DCHECK_GE(read_end_ptr_, read_ptr_);
+    }
   }
   if (NeedReadReservation()) {
     DCHECK_EQ(default_page_len_, read_page_reservation_.GetReservation())
@@ -159,6 +141,12 @@ void BufferedTupleStream::CheckConsistencyFast() const {
 }
 
 void BufferedTupleStream::CheckPageConsistency(const Page* page) const {
+  if (page->attached_to_output_batch) {
+    /// Read page was just attached to output batch.
+    DCHECK(is_read_page(page)) << page->DebugString();
+    DCHECK(!page->handle.is_open());
+    return;
+  }
   DCHECK_EQ(ExpectedPinCount(pinned_, page), page->pin_count()) << DebugString();
   // Only one large row per page.
   if (page->len() > default_page_len_) DCHECK_LE(page->num_rows, 1);
@@ -170,7 +158,7 @@ string BufferedTupleStream::DebugString() const {
   stringstream ss;
   ss << "BufferedTupleStream num_rows=" << num_rows_
      << " rows_returned=" << rows_returned_ << " pinned=" << pinned_
-     << " delete_on_read=" << delete_on_read_ << " closed=" << closed_ << "\n"
+     << " attach_on_read=" << attach_on_read_ << " closed=" << closed_ << "\n"
      << " bytes_pinned=" << bytes_pinned_ << " has_write_iterator=" << has_write_iterator_
      << " write_page=" << write_page_ << " has_read_iterator=" << has_read_iterator_
      << " read_page=";
@@ -201,8 +189,23 @@ string BufferedTupleStream::DebugString() const {
   return ss.str();
 }
 
+void BufferedTupleStream::Page::AttachBufferToBatch(
+    BufferedTupleStream* parent, RowBatch* batch, FlushMode flush) {
+  DCHECK(is_pinned());
+  DCHECK(retrieved_buffer);
+  parent->bytes_pinned_ -= len();
+  // ExtractBuffer() cannot fail because the buffer is already in memory.
+  BufferPool::BufferHandle buffer;
+  Status status =
+      parent->buffer_pool_->ExtractBuffer(parent->buffer_pool_client_, &handle, &buffer);
+  DCHECK(status.ok());
+  batch->AddBuffer(parent->buffer_pool_client_, move(buffer), flush);
+  attached_to_output_batch = true;
+}
+
 string BufferedTupleStream::Page::DebugString() const {
-  return Substitute("$0 num_rows=$1", handle.DebugString(), num_rows);
+  return Substitute("$0 num_rows=$1 retrived_buffer=$2 attached_to_output_batch=$3",
+      handle.DebugString(), num_rows, retrieved_buffer, attached_to_output_batch);
 }
 
 Status BufferedTupleStream::Init(int node_id, bool pinned) {
@@ -214,7 +217,7 @@ Status BufferedTupleStream::Init(int node_id, bool pinned) {
 Status BufferedTupleStream::PrepareForWrite(bool* got_reservation) {
   // This must be the first iterator created.
   DCHECK(pages_.empty());
-  DCHECK(!delete_on_read_);
+  DCHECK(!attach_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
   CHECK_CONSISTENCY_FULL();
@@ -229,10 +232,10 @@ Status BufferedTupleStream::PrepareForWrite(bool* got_reservation) {
 }
 
 Status BufferedTupleStream::PrepareForReadWrite(
-    bool delete_on_read, bool* got_reservation) {
+    bool attach_on_read, bool* got_reservation) {
   // This must be the first iterator created.
   DCHECK(pages_.empty());
-  DCHECK(!delete_on_read_);
+  DCHECK(!attach_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
   CHECK_CONSISTENCY_FULL();
@@ -243,20 +246,17 @@ Status BufferedTupleStream::PrepareForReadWrite(
   // Save reservation for both the read and write iterators.
   buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_);
   buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
-  RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read));
+  RETURN_IF_ERROR(PrepareForReadInternal(attach_on_read));
   return Status::OK();
 }
 
-void BufferedTupleStream::Close(RowBatch* batch, RowBatch::FlushMode flush) {
+void BufferedTupleStream::Close(RowBatch* batch, FlushMode flush) {
   for (Page& page : pages_) {
+    if (page.attached_to_output_batch) continue; // Already returned.
     if (batch != nullptr && page.retrieved_buffer) {
       // Subtle: We only need to attach buffers from pages that we may have returned
-      // references to. ExtractBuffer() cannot fail for these pages because the data
-      // is guaranteed to already be in -memory.
-      BufferPool::BufferHandle buffer;
-      Status status = buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer);
-      DCHECK(status.ok());
-      batch->AddBuffer(buffer_pool_client_, move(buffer), flush);
+      // references to.
+      page.AttachBufferToBatch(this, batch, flush);
     } else {
       buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle);
     }
@@ -271,7 +271,9 @@ void BufferedTupleStream::Close(RowBatch* batch, RowBatch::FlushMode flush) {
 
 int64_t BufferedTupleStream::CalcBytesPinned() const {
   int64_t result = 0;
-  for (const Page& page : pages_) result += page.pin_count() * page.len();
+  for (const Page& page : pages_) {
+    if (!page.attached_to_output_batch) result += page.pin_count() * page.len();
+  }
   return result;
 }
 
@@ -282,6 +284,7 @@ Status BufferedTupleStream::PinPage(Page* page) {
 }
 
 int BufferedTupleStream::ExpectedPinCount(bool stream_pinned, const Page* page) const {
+  DCHECK(!page->attached_to_output_batch);
   return (stream_pinned || is_read_page(page) || is_write_page(page)) ? 1 : 0;
 }
 
@@ -483,12 +486,11 @@ Status BufferedTupleStream::NextReadPage() {
         && !NeedReadReservation(pinned_, num_pages_, true, true)) {
       buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
     }
-  } else if (delete_on_read_) {
+  } else if (attach_on_read_) {
     DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " "
                                          << DebugString();
     DCHECK_NE(&*read_page_, write_page_);
-    bytes_pinned_ -= pages_.front().len();
-    buffer_pool_->DestroyPage(buffer_pool_client_, &pages_.front().handle);
+    DCHECK(read_page_->attached_to_output_batch);
     pages_.pop_front();
     --num_pages_;
     read_page_ = pages_.begin();
@@ -557,12 +559,13 @@ void BufferedTupleStream::InvalidateReadIterator() {
   if (read_page_reservation_.GetReservation() > 0) {
     buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
   }
-  // It is safe to re-read a delete-on-read stream if no rows were read and no pages
+  // It is safe to re-read an attach-on-read stream if no rows were read and no pages
   // were therefore deleted.
-  if (rows_returned_ == 0) delete_on_read_ = false;
+  DCHECK(attach_on_read_ == false || rows_returned_ == 0);
+  if (rows_returned_ == 0) attach_on_read_ = false;
 }
 
-Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_reservation) {
+Status BufferedTupleStream::PrepareForRead(bool attach_on_read, bool* got_reservation) {
   CHECK_CONSISTENCY_FULL();
   InvalidateWriteIterator();
   InvalidateReadIterator();
@@ -570,12 +573,12 @@ Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_reserv
   *got_reservation = pinned_ || pages_.empty()
       || buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
   if (!*got_reservation) return Status::OK();
-  return PrepareForReadInternal(delete_on_read);
+  return PrepareForReadInternal(attach_on_read);
 }
 
-Status BufferedTupleStream::PrepareForReadInternal(bool delete_on_read) {
+Status BufferedTupleStream::PrepareForReadInternal(bool attach_on_read) {
   DCHECK(!closed_);
-  DCHECK(!delete_on_read_);
+  DCHECK(!attach_on_read_);
   DCHECK(!has_read_iterator());
 
   has_read_iterator_ = true;
@@ -599,7 +602,7 @@ Status BufferedTupleStream::PrepareForReadInternal(bool delete_on_read) {
   }
   read_page_rows_returned_ = 0;
   rows_returned_ = 0;
-  delete_on_read_ = delete_on_read;
+  attach_on_read_ = attach_on_read;
   CHECK_CONSISTENCY_FULL();
   return Status::OK();
 }
@@ -708,6 +711,15 @@ Status BufferedTupleStream::GetNextInternal(
 
   if (UNLIKELY(read_page_ == pages_.end()
           || read_page_rows_returned_ == read_page_->num_rows)) {
+    if (read_page_ != pages_.end() && attach_on_read_
+        && !read_page_->attached_to_output_batch) {
+      DCHECK(has_write_iterator());
+      // We're in a read-write stream in the case where we're at the end of the read page
+      // but the buffer was not attached on the last GetNext() call because the write
+      // iterator had not yet advanced.
+      read_page_->AttachBufferToBatch(this, batch, FlushMode::FLUSH_RESOURCES);
+      return Status::OK();
+    }
     // Get the next page in the stream (or the first page if read_page_ was not yet
     // initialized.) We need to do this at the beginning of the GetNext() call to ensure
     // the buffer management semantics. NextReadPage() may unpin or delete the buffer
@@ -729,7 +741,7 @@ Status BufferedTupleStream::GetNextInternal(
   // null tuple indicator.
   if (FILL_FLAT_ROWS) {
     DCHECK(flat_rows != nullptr);
-    DCHECK(!delete_on_read_);
+    DCHECK(!attach_on_read_);
     DCHECK_EQ(batch->num_rows(), 0);
     flat_rows->clear();
     flat_rows->reserve(rows_to_fill);
@@ -768,11 +780,28 @@ Status BufferedTupleStream::GetNextInternal(
   rows_returned_ += rows_to_fill;
   read_page_rows_returned_ += rows_to_fill;
   *eos = (rows_returned_ == num_rows_);
-  if (read_page_rows_returned_ == read_page_->num_rows && (!pinned_ || delete_on_read_)) {
-    // No more data in this page. The batch must be immediately returned up the operator
-    // tree and deep copied so that NextReadPage() can reuse the read page's buffer.
-    // TODO: IMPALA-4179 - instead attach the buffer and flush the resources.
-    batch->MarkNeedsDeepCopy();
+  if (read_page_rows_returned_ == read_page_->num_rows) {
+    // No more data in this page. NextReadPage() may need to reuse the reservation
+    // currently used for 'read_page_' so we may need to flush resources. When
+    // 'attach_on_read_' is true, we're returning the buffer. Otherwise the buffer will
+    // be unpinned later but we're returning a reference to the memory so we need to
+    // signal to the caller that the resources are going away. Note that if there is a
+    // read-write page it is not safe to attach the buffer yet because more rows may be
+    // appended to the page.
+    if (attach_on_read_) {
+      if (!has_read_write_page()) {
+        // Safe to attach because we already called GetBuffer() in NextReadPage().
+        // TODO: always flushing for pinned streams is overkill since we may not need
+        // to reuse the reservation immediately. Changing this may require modifying
+        // callers of this class.
+        read_page_->AttachBufferToBatch(this, batch, FlushMode::FLUSH_RESOURCES);
+      }
+    } else if (!pinned_) {
+      // Flush resources so that we can safely unpin the page on the next GetNext() call.
+      // Note that if this is a read/write page we might not actually do the advance on
+      // the next call to GetNext(). In that case the flush is still safe to do.
+      batch->MarkFlushResources();
+    }
   }
   if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill);
   DCHECK_LE(read_ptr_, read_end_ptr_);
@@ -1028,7 +1057,7 @@ void BufferedTupleStream::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const
   DCHECK(row != nullptr);
   DCHECK(!closed_);
   DCHECK(is_pinned());
-  DCHECK(!delete_on_read_);
+  DCHECK(!attach_on_read_);
   uint8_t* data = flat_row;
   return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) :
                                UnflattenTupleRow<false>(&data, row);

http://git-wip-us.apache.org/repos/asf/impala/blob/240fde62/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index 565b5fa..4090ea8 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -93,12 +93,9 @@ class TupleRow;
 /// buffer is needed to keep the row being processed in-memory, but only default-sized
 /// buffers are needed for the other streams being written.
 ///
-/// The tuple stream also supports a 'delete_on_read' mode, enabled by passing a flag
-/// to PrepareForRead() which deletes the stream's pages as it does a final read
-/// pass over the stream.
-///
-/// TODO: IMPALA-4179: the buffer management can be simplified once we can attach
-/// buffers to RowBatches.
+/// The tuple stream also supports a 'attach_on_read' mode, enabled by passing a flag
+/// to PrepareForRead() which attaches the stream's pages to the output batch as it
+/// does a final destructive read pass over the stream.
 ///
 /// Page layout:
 /// Rows are stored back to back starting at the first byte of each page's buffer, with
@@ -162,11 +159,11 @@ class TupleRow;
 /// Read:
 ///   1. Unpinned: Only a single read page is pinned at a time. This means that only
 ///     enough reservation to pin a single page is needed to read the stream, regardless
-///     of the stream's size. Each page is deleted or unpinned (if delete on read is true
+///     of the stream's size. Each page is attached or unpinned (if attach on read is true
 ///     or false respectively) before advancing to the next page.
 ///   2. Pinned: All pages in the stream are pinned so do not need to be pinned or
-///     unpinned when reading from the stream. If delete on read is true, pages are
-///     deleted after being read. If the stream was previously unpinned, the page's data
+///     unpinned when reading from the stream. If attach on read is true, pages are
+///     attached after being read. If the stream was previously unpinned, the page's data
 ///     may not yet be in memory - reading from the stream can block on I/O or fail with
 ///     an I/O error.
 /// Write:
@@ -179,12 +176,17 @@ class TupleRow;
 ///     or free up other memory before retrying.
 ///
 /// Memory lifetime of rows read from stream:
-/// If the stream is pinned and delete on read is false, it is valid to access any tuples
-/// returned via GetNext() until the stream is unpinned. If the stream is unpinned or
-/// delete on read is true, then the batch returned from GetNext() may have the
-/// needs_deep_copy flag set, which means that any tuple memory returned so far from the
-/// stream may be freed on the next call to GetNext().
-/// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers to the batch.
+/// There are several cases.
+/// 1. If the stream is pinned and attach on read is false, it is valid to access any
+///    tuples returned via GetNext() until the stream is unpinned.
+/// 2. If the stream is in attach on read mode, all buffers referenced by returned rows
+///    are attached to the batches by that GetNext() call or a subsequent one. The
+///    caller is responsible for managing the lifetime of those buffers.
+/// 3. If the stream is unpinned and not in attach on read mode, then the batch returned
+///    from GetNext() may have the FLUSH_RESOURCES flag set, which means that any tuple
+///    memory returned so far from the stream may be freed on the next call to GetNext().
+///    It is *not* safe to return references to rows returned in this mode outside of
+///    the ExecNode.
 ///
 /// Manual construction of rows with AddRowCustomBegin()/AddRowCustomEnd():
 /// The BufferedTupleStream supports allocation of uninitialized rows with
@@ -197,8 +199,7 @@ class TupleRow;
 /// will not be modified until the stream is read via GetNext().
 /// TODO: IMPALA-5007: try to remove AddRowCustom*() by unifying with AddRow().
 ///
-/// TODO: we need to be able to do read ahead for pages. We need some way to indicate a
-/// page will need to be pinned soon.
+/// TODO: prefetching for pages could speed up iteration over unpinned streams.
 class BufferedTupleStream {
  public:
   /// A pointer to the start of a flattened TupleRow in the stream.
@@ -213,7 +214,7 @@ class BufferedTupleStream {
       int64_t max_page_len,
       const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>());
 
-  virtual ~BufferedTupleStream();
+  ~BufferedTupleStream() { DCHECK(closed_); }
 
   /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called
   /// once before any of the other APIs.
@@ -233,23 +234,23 @@ class BufferedTupleStream {
   /// Prepares the stream for interleaved reads and writes by saving enough reservation
   /// for default-sized read and write pages. Called after Init() and before the first
   /// AddRow() or AddRowCustomBegin() call.
-  /// 'delete_on_read': Pages are deleted after they are read.
+  /// 'attach_on_read': Pages are attached to the output batch after they are read.
   /// 'got_reservation': set to true if there was enough reservation to initialize the
   ///     read and write pages and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
   Status PrepareForReadWrite(
-      bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+      bool attach_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
 
   /// Prepares the stream for reading, invalidating the write iterator (if there is one).
   /// Therefore must be called after the last AddRow() or AddRowCustomEnd() and before
   /// GetNext(). PrepareForRead() can be called multiple times to do multiple read passes
   /// over the stream, unless rows were read from the stream after PrepareForRead() or
-  /// PrepareForReadWrite() was called with delete_on_read = true.
-  /// 'delete_on_read': Pages are deleted after they are read.
+  /// PrepareForReadWrite() was called with attach_on_read = true.
+  /// 'attach_on_read': Pages are attached to the output batch after they are read.
   /// 'got_reservation': set to true if there was enough reservation to initialize the
   ///     first read page and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
-  Status PrepareForRead(bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+  Status PrepareForRead(bool attach_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
 
   /// Adds a single row to the stream. There are three possible outcomes:
   /// a) The append succeeds. True is returned.
@@ -303,7 +304,10 @@ class BufferedTupleStream {
   enum UnpinMode {
     /// All pages in the stream are unpinned and the read/write positions in the stream
     /// are reset. No more rows can be written to the stream after this. The stream can
-    /// be re-read from the beginning by calling PrepareForRead().
+    /// be re-read from the beginning by calling PrepareForRead(). It in invalid to call
+    /// UnpinStream(UNPIN_ALL) if the stream is in 'attach_on_read' mode and >= 1 row has
+    /// been read from the stream, because this would leave the stream in limbo where it
+    /// still has unpinned pages but it cannot be read or written to.
     UNPIN_ALL,
     /// All pages are unpinned aside from the current read and write pages (if any),
     /// which is left in the same state. The unpinned stream can continue being read
@@ -315,14 +319,21 @@ class BufferedTupleStream {
   void UnpinStream(UnpinMode mode);
 
   /// Get the next batch of output rows, which are backed by the stream's memory.
-  /// If the stream is unpinned or 'delete_on_read' is true, the 'needs_deep_copy'
-  /// flag may be set on 'batch' to signal that memory will be freed on the next
-  /// call to GetNext() and that the caller should copy out any data it needs from
-  /// rows in 'batch' or in previous batches returned from GetNext().
   ///
-  /// If the stream is pinned and 'delete_on_read' is false, the memory backing the
+  /// If the stream is in 'attach_on_read' mode then buffers are attached to 'batch'
+  /// when the last row referencing the buffer is returned. The FLUSH_RESOURCES flag
+  /// is always set when attaching such a buffer.
+  /// TODO: always flushing for pinned streams is overkill since we may not need
+  /// to reuse the reservation immediately. Changing this may require modifying
+  /// callers of this class.
+  ///
+  /// If the stream is unpinned and not in 'attach_on_read' mode, the FLUSH_RESOURCES
+  /// flag may be set on the batch to signal that memory will be freed on the next call
+  /// to GetNext() and that the caller should copy out any data it needs from rows in
+  /// 'batch' or in previous batches returned from GetNext().
+  ///
+  /// If the stream is pinned and 'attach_on_read' is false, the memory backing the
   /// rows will remain valid until the stream is unpinned, destroyed, etc.
-  /// TODO: IMPALA-4179: update when we simplify the memory transfer model.
   Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT;
 
   /// Same as above, but populate 'flat_rows' with a pointer to the flat version of
@@ -370,8 +381,6 @@ class BufferedTupleStream {
 
   /// Wrapper around BufferPool::PageHandle that tracks additional info about the page.
   struct Page {
-    Page() : num_rows(0), retrieved_buffer(true) {}
-
     inline int len() const { return handle.len(); }
     inline bool is_pinned() const { return handle.is_pinned(); }
     inline int pin_count() const { return handle.pin_count(); }
@@ -380,17 +389,27 @@ class BufferedTupleStream {
       retrieved_buffer = true;
       return Status::OK();
     }
+
+    /// Attach the buffer from this page to 'batch'. Only valid to call if the page is
+    /// pinned and 'retrieved_buffer' is true. Decrements parent->bytes_pinned_.
+    void AttachBufferToBatch(
+        BufferedTupleStream* parent, RowBatch* batch, RowBatch::FlushMode flush);
+
     std::string DebugString() const;
 
     BufferPool::PageHandle handle;
 
     /// Number of rows written to the page.
-    int num_rows;
+    int num_rows = 0;
 
     /// Whether we called GetBuffer() on the page since it was last pinned. This means
     /// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() may have
     /// returned rows referencing the page's buffer.
-    bool retrieved_buffer;
+    bool retrieved_buffer = true;
+
+    /// If the page was just attached to the output batch on the last GetNext() call while
+    /// in attach_on_read mode. If true, then 'handle' is closed.
+    bool attached_to_output_batch = false;
   };
 
   /// Runtime state instance used to check for cancellation. Not owned.
@@ -400,7 +419,7 @@ class BufferedTupleStream {
   const RowDescriptor* desc_;
 
   /// Plan node ID, used for error reporting.
-  int node_id_;
+  int node_id_ = -1;
 
   /// The size of the fixed length portion for each tuple in the row.
   std::vector<int> fixed_tuple_sizes_;
@@ -420,18 +439,18 @@ class BufferedTupleStream {
   /// List of pages in the stream.
   /// Empty iff one of two cases applies:
   /// * before the first row has been added with AddRow() or AddRowCustom().
-  /// * after the stream has been destructively read in 'delete_on_read' mode
+  /// * after the stream has been destructively read in 'attach_on_read' mode
   std::list<Page> pages_;
   // IMPALA-5629: avoid O(n) list.size() call by explicitly tracking the number of pages.
   // TODO: remove when we switch to GCC5+, where list.size() is O(1). See GCC bug #49561.
-  int64_t num_pages_;
+  int64_t num_pages_ = 0;
 
-  /// Total size of pages_, including any pages already deleted in 'delete_on_read'
+  /// Total size of pages_, including any pages already deleted in 'attach_on_read'
   /// mode.
-  int64_t total_byte_size_;
+  int64_t total_byte_size_ = 0;
 
   /// True if there is currently an active read iterator for the stream.
-  bool has_read_iterator_;
+  bool has_read_iterator_ = false;
 
   /// The current page being read. When no read iterator is active, equal to list.end().
   /// When a read iterator is active, either points to the current read page, or equals
@@ -447,31 +466,31 @@ class BufferedTupleStream {
   BufferPool::SubReservation read_page_reservation_;
 
   /// Number of rows returned from the current read_page_.
-  uint32_t read_page_rows_returned_;
+  uint32_t read_page_rows_returned_ = -1;
 
   /// Pointer into read_page_ to the byte after the last row read.
-  uint8_t* read_ptr_;
+  uint8_t* read_ptr_ = nullptr;
 
   /// Pointer to one byte past the end of read_page_. Used to detect overruns.
-  const uint8_t* read_end_ptr_;
+  const uint8_t* read_end_ptr_ = nullptr;
 
   /// Pointer into write_page_ to the byte after the last row written.
-  uint8_t* write_ptr_;
+  uint8_t* write_ptr_ = nullptr;
 
   /// Pointer to one byte past the end of write_page_. Cached to speed up computation
-  const uint8_t* write_end_ptr_;
+  const uint8_t* write_end_ptr_ = nullptr;
 
   /// Number of rows returned to the caller from GetNext() since the last
   /// PrepareForRead() call.
-  int64_t rows_returned_;
+  int64_t rows_returned_ = 0;
 
   /// True if there is currently an active write iterator into the stream.
-  bool has_write_iterator_;
+  bool has_write_iterator_ = false;
 
   /// The current page for writing. NULL if there is no write iterator or no current
   /// write page. Always pinned. Size is 'default_page_len_', except temporarily while
   /// appending a larger row between AddRowCustomBegin() and AddRowCustomEnd().
-  Page* write_page_;
+  Page* write_page_ = nullptr;
 
   /// Saved reservation for write iterator. 'default_page_len_' reservation is saved if
   /// there is a write iterator, no page currently pinned for writing and the possibility
@@ -484,11 +503,11 @@ class BufferedTupleStream {
 
   /// Total bytes of pinned pages in pages_, stored to avoid iterating over the list
   /// to compute it.
-  int64_t bytes_pinned_;
+  int64_t bytes_pinned_ = 0;
 
   /// Number of rows stored in the stream. Includes rows that were already deleted during
-  /// a destructive 'delete_on_read' pass over the stream.
-  int64_t num_rows_;
+  /// a destructive 'attach_on_read' pass over the stream.
+  int64_t num_rows_ = 0;
 
   /// The default length in bytes of pages used to store the stream's rows. All rows that
   /// fit in a default-sized page are stored in default-sized page.
@@ -503,14 +522,14 @@ class BufferedTupleStream {
   const bool has_nullable_tuple_;
 
   /// If true, pages are deleted after they are read during this read pass. Once rows
-  /// have been read from a stream with 'delete_on_read_' true, this is always true.
-  bool delete_on_read_;
+  /// have been read from a stream with 'attach_on_read_' true, this is always true.
+  bool attach_on_read_ = false;
 
-  bool closed_; // Used for debugging.
+  bool closed_ = false; // Used for debugging.
 
   /// If true, this stream has been explicitly pinned by the caller and all pages are
   /// kept pinned until the caller calls UnpinStream().
-  bool pinned_;
+  bool pinned_ = true;
 
   bool is_read_page(const Page* page) const {
     return read_page_ != pages_.end() && &*read_page_ == page;
@@ -585,7 +604,7 @@ class BufferedTupleStream {
 
   /// Same as PrepareForRead(), except the iterators are not invalidated and
   /// the caller is assumed to have checked there is sufficient unused reservation.
-  Status PrepareForReadInternal(bool delete_on_read) WARN_UNUSED_RESULT;
+  Status PrepareForReadInternal(bool attach_on_read) WARN_UNUSED_RESULT;
 
   /// Pins the next read page. This blocks reading from disk if necessary to bring the
   /// page's data into memory. Updates read_page_, read_ptr_, and
@@ -593,7 +612,9 @@ class BufferedTupleStream {
   Status NextReadPage() WARN_UNUSED_RESULT;
 
   /// Invalidate the read iterator, and release any resources associated with the active
-  /// iterator.
+  /// iterator. Invalid to call if 'attach_on_read_' is true and >= 1 rows have been read,
+  /// because that would leave the stream in limbo where it still has pages but it is
+  /// invalid to read or write from in future.
   void InvalidateReadIterator();
 
   /// Returns the total additional bytes that this row will consume in write_page_ if
@@ -618,7 +639,8 @@ class BufferedTupleStream {
   void UnpinPageIfNeeded(Page* page, bool stream_pinned);
 
   /// Return the expected pin count for 'page' in the current stream based on the current
-  /// read and write pages and whether the stream is pinned.
+  /// read and write pages and whether the stream is pinned. Not valid to call if
+  /// the page was just deleted, i.e. page->attached_to_output_batch == false.
   int ExpectedPinCount(bool stream_pinned, const Page* page) const;
 
   /// Return true if the stream in its current state needs to have a reservation for

http://git-wip-us.apache.org/repos/asf/impala/blob/240fde62/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 67adb9b..90d8c4d 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -418,6 +418,7 @@ class RowBatch {
   friend class RowBatchSerializeBaseline;
   friend class RowBatchSerializeBenchmark;
   friend class RowBatchSerializeTest;
+  friend class SimpleTupleStreamTest;
 
   /// Creates an empty row batch based on the serialized row batch header. Called from
   /// FromProtobuf() above before desrialization of a protobuf row batch.

http://git-wip-us.apache.org/repos/asf/impala/blob/240fde62/be/src/runtime/tuple.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 68f313d..91517f1 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -247,6 +247,10 @@ class Tuple {
     return static_cast<bool*>(GetSlot(offset));
   }
 
+  int32_t* GetIntSlot(int offset) {
+    return static_cast<int32_t*>(GetSlot(offset));
+  }
+
   int64_t* GetBigIntSlot(int offset) {
     return static_cast<int64_t*>(GetSlot(offset));
   }


[5/5] impala git commit: IMPALA-7329: Blacklist CDH Maven snapshots repository

Posted by ta...@apache.org.
IMPALA-7329: Blacklist CDH Maven snapshots repository

The Impala development bootstrapping depends on CDH Maven snapshots
which transitively pull dependencies from other repositories which
can cause the build to be non-reproducible, e.g. IMPALA-7316. This
patch makes the build to be reproducible by blacklisting
cdh.snapshots.repo so that Maven does not accidentally downloads the
latest CDH snapshots when running a build, which can cause
incompatibility issues.

Testing:
- Ran core tests with CDH_BUILD_NUMBER=422770 and did not hit the
  issue described in IMPALA-7316

Change-Id: Id945bc2769f92f3df3bb4f617b00db77a6502ff3
Reviewed-on: http://gerrit.cloudera.org:8080/10999
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/316b17ac
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/316b17ac
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/316b17ac

Branch: refs/heads/master
Commit: 316b17ac55adb3d1deeb1289b4045688269b201d
Parents: 3f8375d
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Thu Jul 19 19:59:29 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 02:49:48 2018 +0000

----------------------------------------------------------------------
 impala-parent/pom.xml | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/316b17ac/impala-parent/pom.xml
----------------------------------------------------------------------
diff --git a/impala-parent/pom.xml b/impala-parent/pom.xml
index bd091ea..92233f4 100644
--- a/impala-parent/pom.xml
+++ b/impala-parent/pom.xml
@@ -71,6 +71,26 @@ under the License.
       </snapshots>
     </repository>
     <repository>
+      <!--
+      The Impala development bootstrapping depends on CDH Maven snapshots
+      which transitively pull dependencies from other repositories which
+      can cause the build to be non-reproducible, e.g. IMPALA-7316. This
+      patch makes the build to be reproducible by blacklisting
+      cdh.snapshots.repo so that Maven does not accidentally downloads the
+      latest CDH snapshots when running a build, which can cause
+      incompatibility issues.
+      -->
+      <id>cdh.snapshots.repo</id>
+      <url>https://repository.cloudera.com/content/repositories/snapshots</url>
+      <name>CDH Snapshots Repository</name>
+      <releases>
+        <enabled>false</enabled>
+      </releases>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+    <repository>
       <id>cdh.rcs.releases.repo</id>
       <url>https://repository.cloudera.com/content/groups/cdh-releases-rcs</url>
       <name>CDH Releases Repository</name>


[3/5] impala git commit: IMPALA-7317: add scripts to post flake8 comments

Posted by ta...@apache.org.
IMPALA-7317: add scripts to post flake8 comments

The script is used by a new jenkins job gerrit-auto-critic
to post comments on code reviews.

Testing:
This patch deliberately contains some flake8 violations so
that gerrit-auto-critic will flag them.

Change-Id: I7d348ea4944f829a407bd7b2939654f272736170
Reviewed-on: http://gerrit.cloudera.org:8080/11054
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Michael Brown <mi...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/47b46067
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/47b46067
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/47b46067

Branch: refs/heads/master
Commit: 47b4606742287361b8c3751b4db77ab5b3508783
Parents: 240fde6
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jul 25 23:38:21 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 02:30:54 2018 +0000

----------------------------------------------------------------------
 bin/jenkins/critique-gerrit-review.py | 130 +++++++++++++++++++++++++++++
 1 file changed, 130 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/47b46067/bin/jenkins/critique-gerrit-review.py
----------------------------------------------------------------------
diff --git a/bin/jenkins/critique-gerrit-review.py b/bin/jenkins/critique-gerrit-review.py
new file mode 100755
index 0000000..3311793
--- /dev/null
+++ b/bin/jenkins/critique-gerrit-review.py
@@ -0,0 +1,130 @@
+#!/usr/bin/python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Usage: critique-gerrit-review.py <git commit>
+#
+# This script is meant to run on an jenkins.impala.io build slave and post back comments
+# to a code review. It does not need to run on all supported platforms so we use system
+# python instead of the full Impala virtualenv.
+#
+# This script runs in the context of a source checkout. It posts comments for issues
+# introduced between HEAD^ and HEAD. It picks up metadata from environment variables
+# set by the jenkins gerrit trigger: GERRIT_CHANGE_NUMBER, GERRIT_PATCHSET_NUMBER, etc.
+#
+# It uses the gerrit ssh interface to post the review, connecting as
+# impala-public-jenkins.
+# Ref: https://gerrit-review.googlesource.com/Documentation/cmd-review.html
+#
+# Dependencies:
+# ssh, pip, virtualenv
+#
+# TODO: generalise to other warnings
+# * Lines too long and trailing whitespace
+# * clang-tidy
+
+from collections import defaultdict
+import json
+import os
+from os import environ
+import os.path
+import re
+from subprocess import check_call, Popen, PIPE
+import sys
+import virtualenv
+
+FLAKE8_VERSION = "3.5.0"
+FLAKE8_DIFF_VERSION = "0.2.2"
+
+VENV_PATH = "gerrit_critic_venv"
+VENV_BIN = os.path.join(VENV_PATH, "bin")
+PIP_PATH = os.path.join(VENV_BIN, "pip")
+FLAKE8_DIFF_PATH = os.path.join(VENV_BIN, "flake8-diff")
+
+
+def setup_virtualenv():
+  """Set up virtualenv with flake8-diff."""
+  virtualenv.create_environment(VENV_PATH)
+  check_call([PIP_PATH, "install",
+              "flake8=={0}".format(FLAKE8_VERSION),
+              "flake8-diff=={0}".format(FLAKE8_DIFF_VERSION)])
+
+
+def get_flake8_comments(revision):
+  """Get flake8 warnings for code changes made in the git commit 'revision'.
+  Returns a dict with file path as keys and a list of CommentInput objects. See
+  https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#review-input
+  for information on the format."""
+  comments = defaultdict(lambda: [])
+  # flake8 needs to be on the path.
+  flake8_env = os.environ.copy()
+  flake8_env["PATH"] = "{0}:{1}".format(VENV_BIN, flake8_env["PATH"])
+
+  base_revision = "{0}^".format(revision)
+  flake8_diff_proc = Popen(
+      [FLAKE8_DIFF_PATH, "--standard-flake8-output", "--color", "off", base_revision,
+       revision],
+      stdin=PIPE, stdout=PIPE, stderr=PIPE, env=flake8_env)
+  stdout, stderr = flake8_diff_proc.communicate()
+  # Ignore the return code since it will be non-zero if any violations are found. We want
+  # to continue in that case. Instead check stderr for any errors.
+  if stderr:
+    raise Exception("Did not expect flake8-diff to write to stderr:\n{0}".format(stderr))
+
+  # Match output lines like:
+  #   bin/jenkins/flake8-gerrit-review.py:25:1: F401 'json' imported but unused
+  VIOLATION_RE = re.compile(r"^([^:]*):([0-9]*):([0-9]*): (.*)$")
+
+  for line in stdout.splitlines():
+    match = VIOLATION_RE.match(line)
+    if not match:
+      raise Exception("Pattern did not match line:\n{0}".format(line))
+    file, line, col, details = match.groups()
+    line = int(line)
+    col = int(col)
+    comments_for_file = comments[file]
+    comment = {"message": "flake8: {0}".format(details)}
+    # Heuristic: if the error is on the first column, assume it applies to the whole line.
+    if col == 1:
+      comment["line"] = line
+    else:
+      comment["range"] = {"start_line": line, "end_line": line,
+                          "start_character": col - 1, "end_character": col}
+    comments_for_file.append(comment)
+  return comments
+
+
+def post_review_to_gerrit(review_input):
+  """Post a review to the gerrit patchset. 'review_input' is a ReviewInput JSON object
+  containing the review comments. The gerrit change and patchset are picked up from
+  environment variables set by the gerrit jenkins trigger."""
+  change_num = environ["GERRIT_CHANGE_NUMBER"]
+  patch_num = environ["GERRIT_PATCHSET_NUMBER"]
+  proc = Popen(["ssh", "-p", environ["GERRIT_PORT"],
+                "impala-public-jenkins@" + environ["GERRIT_HOST"], "gerrit", "review",
+                "--project", environ["GERRIT_PROJECT"], "--json",
+                "{0},{1}".format(change_num, patch_num)], stdin=PIPE)
+  proc.communicate(json.dumps(review_input))
+  if proc.returncode != 0:
+    raise Exception("Error posting review to gerrit.")
+
+
+if __name__ == "__main__":
+  setup_virtualenv()
+  review_input = {"comments": get_flake8_comments(sys.argv[1])}
+  print json.dumps(review_input, indent=True)
+  post_review_to_gerrit(review_input)


[4/5] impala git commit: IMPALA-7296: bytes limit for row batch queue

Posted by ta...@apache.org.
IMPALA-7296: bytes limit for row batch queue

https://goo.gl/N9LgQt summarises the memory problems I'm trying to solve
here.

Limit the number of enqueued row batches to a number of bytes,
instead of limiting the total number of batches. This helps
avoid pathologically high memory consumption for wide rows where the #
batches limit does not effectively limit the memory consumption.

The bytes limit only lowers the effective capacity of the queue
for wider rows, typically 150 bytes or wider. These are the
cases when we want to reduce the queue's capacity.

E.g. on a system with 10 disks, the previous sizing gave a queue
of 100 batches. If we assume rows with 10x16 byte columns, then
100 batches is ~16MB of data.

Remove RowBatchQueueCapacity counter that is less relevant now
and was not correctly initialised.

Testing:
Added some basic unit tests.

Add regression test that fails reliably before this change.

Ran exhaustive build.

Change-Id: Iaa06d1d8da2a6d101efda08f620c0bf84a71e681
Reviewed-on: http://gerrit.cloudera.org:8080/10977
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3f8375d3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3f8375d3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3f8375d3

Branch: refs/heads/master
Commit: 3f8375d3e642554b5506f3e731f94e6328fcbcf9
Parents: 47b4606
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Jul 16 00:30:42 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 02:45:21 2018 +0000

----------------------------------------------------------------------
 be/src/exec/scan-node.cc                   |  8 ++-
 be/src/exec/scan-node.h                    |  3 -
 be/src/runtime/row-batch-queue.cc          |  4 +-
 be/src/runtime/row-batch-queue.h           | 21 +++++-
 be/src/util/blocking-queue-test.cc         | 58 +++++++++++++++--
 be/src/util/blocking-queue.h               | 85 +++++++++++++++++++++----
 tests/common/test_dimensions.py            |  5 ++
 tests/query_test/test_mem_usage_scaling.py | 49 ++++++++++++++
 8 files changed, 206 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/be/src/exec/scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 4d59eed..85e1953 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -42,6 +42,9 @@ DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in
 DEFINE_int32_hidden(max_queued_row_batches_per_scanner_thread, 5,
     "(Advanced) the maximum number of queued row batches per scanner thread.");
 
+DEFINE_int64(max_queued_row_batch_bytes, 16L * 1024 * 1024,
+    "(Advanced) the maximum bytes of queued rows per multithreaded scan node.");
+
 using boost::algorithm::join;
 
 namespace impala {
@@ -202,8 +205,6 @@ void ScanNode::ScannerThreadState::Prepare(ScanNode* parent) {
       ADD_COUNTER(profile, "RowBatchBytesEnqueued", TUnit::BYTES);
   row_batches_get_timer_ = ADD_TIMER(profile, "RowBatchQueueGetWaitTime");
   row_batches_put_timer_ = ADD_TIMER(profile, "RowBatchQueuePutWaitTime");
-  row_batches_max_capacity_ =
-      profile->AddHighWaterMarkCounter("RowBatchQueueCapacity", TUnit::UNIT);
   row_batches_peak_mem_consumption_ =
       ADD_COUNTER(profile, "RowBatchQueuePeakMemoryUsage", TUnit::BYTES);
 }
@@ -240,7 +241,8 @@ void ScanNode::ScannerThreadState::Open(
   VLOG_QUERY << "Max row batch queue size for scan node '" << parent->id()
       << "' in fragment instance '" << PrintId(state->fragment_instance_id())
       << "': " << max_row_batches;
-  batch_queue_.reset(new RowBatchQueue(max_row_batches));
+  batch_queue_.reset(
+      new RowBatchQueue(max_row_batches, FLAGS_max_queued_row_batch_bytes));
 
   // Start measuring the scanner thread concurrency only once the node is opened.
   average_concurrency_ = parent->runtime_profile()->AddSamplingCounter(

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/be/src/exec/scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 1d0728c..9f47c45 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -303,9 +303,6 @@ class ScanNode : public ExecNode {
     /// The wait time for enqueuing a row batch into the row batch queue.
     RuntimeProfile::Counter* row_batches_put_timer_ = nullptr;
 
-    /// Maximum capacity of the row batch queue.
-    RuntimeProfile::HighWaterMarkCounter* row_batches_max_capacity_ = nullptr;
-
     /// Peak memory consumption of the materialized batch queue. Updated in Close().
     RuntimeProfile::Counter* row_batches_peak_mem_consumption_ = nullptr;
   };

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/be/src/runtime/row-batch-queue.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-queue.cc b/be/src/runtime/row-batch-queue.cc
index 1fd5555..e694338 100644
--- a/be/src/runtime/row-batch-queue.cc
+++ b/be/src/runtime/row-batch-queue.cc
@@ -23,8 +23,8 @@
 
 namespace impala {
 
-RowBatchQueue::RowBatchQueue(int max_batches)
-  : BlockingQueue<unique_ptr<RowBatch>>(max_batches) {}
+RowBatchQueue::RowBatchQueue(int max_batches, int64_t max_bytes)
+  : BlockingQueue<unique_ptr<RowBatch>,RowBatchBytesFn>(max_batches, max_bytes) {}
 
 RowBatchQueue::~RowBatchQueue() {
   DCHECK(cleanup_queue_.empty());

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/be/src/runtime/row-batch-queue.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-queue.h b/be/src/runtime/row-batch-queue.h
index bd2f551..79e8293 100644
--- a/be/src/runtime/row-batch-queue.h
+++ b/be/src/runtime/row-batch-queue.h
@@ -21,6 +21,7 @@
 #include <list>
 #include <memory>
 
+#include "runtime/row-batch.h"
 #include "util/blocking-queue.h"
 #include "util/spinlock.h"
 
@@ -28,18 +29,32 @@ namespace impala {
 
 class RowBatch;
 
+/// Functor that returns the bytes in MemPool chunks for a row batch.
+/// Note that we don't include attached BufferPool::BufferHandle objects because this
+/// queue is only used in scan nodes that don't attach buffers.
+struct RowBatchBytesFn {
+  int64_t operator()(const std::unique_ptr<RowBatch>& batch) {
+    return batch->tuple_data_pool()->total_reserved_bytes();
+  }
+};
+
 /// Extends blocking queue for row batches. Row batches have a property that
 /// they must be processed in the order they were produced, even in cancellation
 /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
 /// and we need to make sure those ptrs stay valid.
 /// Row batches that are added after Shutdown() are queued in a separate "cleanup"
 /// queue, which can be cleaned up during Close().
+///
+/// The queue supports limiting the capacity in terms of bytes enqueued.
+///
 /// All functions are thread safe.
-class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>> {
+class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn> {
  public:
-  /// max_batches is the maximum number of row batches that can be queued.
+  /// 'max_batches' is the maximum number of row batches that can be queued.
+  /// 'max_bytes' is the maximum number of bytes of row batches that can be queued (-1
+  /// means no limit).
   /// When the queue is full, producers will block.
-  RowBatchQueue(int max_batches);
+  RowBatchQueue(int max_batches, int64_t max_bytes);
   ~RowBatchQueue();
 
   /// Adds a batch to the queue. This is blocking if the queue is full.

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/be/src/util/blocking-queue-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue-test.cc b/be/src/util/blocking-queue-test.cc
index f61de21..8a91275 100644
--- a/be/src/util/blocking-queue-test.cc
+++ b/be/src/util/blocking-queue-test.cc
@@ -28,6 +28,14 @@
 
 namespace impala {
 
+/// Functor that returns the size of T.
+template <typename T>
+struct SizeofFn {
+  int64_t operator()(const T& item) {
+    return sizeof(T);
+  }
+};
+
 TEST(BlockingQueueTest, TestBasic) {
   int32_t i;
   BlockingQueue<int32_t> test_queue(5);
@@ -67,12 +75,49 @@ TEST(BlockingQueueTest, TestPutWithTimeout) {
   ASSERT_TRUE(test_queue.BlockingPutWithTimeout(3, timeout_micros));
 }
 
+TEST(BlockingQueueTest, TestBytesLimit) {
+  // 10 bytes => limit of 2 elements
+  BlockingQueue<int32_t, SizeofFn<int32_t>> test_queue(1000, 10);
+  int64_t SHORT_TIMEOUT_MICROS = 1 * 1000L; // 1ms
+  int64_t LONG_TIMEOUT_MICROS = 1000L * 1000L * 60L; // 1m
+
+  // First two should succeed.
+  ASSERT_TRUE(test_queue.BlockingPut(1));
+  ASSERT_TRUE(test_queue.BlockingPutWithTimeout(2, SHORT_TIMEOUT_MICROS));
+  EXPECT_EQ(2, test_queue.Size());
+
+  // Put should timeout - no capacity.
+  ASSERT_FALSE(test_queue.BlockingPutWithTimeout(3, SHORT_TIMEOUT_MICROS));
+  EXPECT_EQ(2, test_queue.Size());
+
+  // Test that puts of both types get blocked then unblocked when bytes are
+  // removed from queue.
+  thread put_thread([&] () { test_queue.BlockingPut(4); });
+  thread put_with_timeout_thread([&] () {
+    test_queue.BlockingPutWithTimeout(4, LONG_TIMEOUT_MICROS);
+  });
+  EXPECT_EQ(2, test_queue.Size());
+  int32_t v;
+  EXPECT_TRUE(test_queue.BlockingGet(&v));
+  EXPECT_EQ(1, v);
+  EXPECT_TRUE(test_queue.BlockingGet(&v));
+  EXPECT_EQ(2, v);
+  EXPECT_TRUE(test_queue.BlockingGet(&v));
+  EXPECT_EQ(4, v);
+  EXPECT_TRUE(test_queue.BlockingGet(&v));
+  EXPECT_EQ(4, v);
+
+  put_thread.join();
+  put_with_timeout_thread.join();
+}
+
+template <typename ElemBytesFn>
 class MultiThreadTest { // NOLINT: members are not arranged for minimal padding
  public:
-  MultiThreadTest()
+  MultiThreadTest(int64_t bytes_limit = -1)
     : iterations_(10000),
       nthreads_(5),
-      queue_(iterations_*nthreads_/10),
+      queue_(iterations_*nthreads_/10, bytes_limit),
       num_inserters_(nthreads_) {
   }
 
@@ -134,7 +179,7 @@ class MultiThreadTest { // NOLINT: members are not arranged for minimal padding
 
   int iterations_;
   int nthreads_;
-  BlockingQueue<int32_t> queue_;
+  BlockingQueue<int32_t, ElemBytesFn> queue_;
   // Lock for gotten_ and num_inserters_.
   mutex lock_;
   // Map from inserter thread id to number of consumed elements from that id.
@@ -148,7 +193,12 @@ class MultiThreadTest { // NOLINT: members are not arranged for minimal padding
 };
 
 TEST(BlockingQueueTest, TestMultipleThreads) {
-  MultiThreadTest test;
+  MultiThreadTest<ByteLimitDisabledFn<int32_t>> test;
+  test.Run();
+}
+
+TEST(BlockingQueueTest, TestMultipleThreadsWithBytesLimit) {
+  MultiThreadTest<SizeofFn<int32_t>> test(100);
   test.Run();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/be/src/util/blocking-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index 8b07dc5..34c2453 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -34,28 +34,46 @@
 
 namespace impala {
 
+/// Default functor that always returns 0 bytes. This disables the byte limit
+/// functionality for the queue.
+template <typename T>
+struct ByteLimitDisabledFn {
+  int64_t operator()(const T& item) {
+    return 0;
+  }
+};
+
 /// Fixed capacity FIFO queue, where both BlockingGet() and BlockingPut() operations block
 /// if the queue is empty or full, respectively.
 ///
+/// The queue always has a hard maximum capacity of elements. It also has an optional
+/// limit on the bytes enqueued. This limit is a soft limit - one element can always be
+/// enqueued regardless of the size in bytes. In order to use the bytes limit, the queue
+/// must be instantiated with a functor that returns the size in bytes of an enqueued
+/// item. The functor is invoked multiple times and must always return the same value for
+/// the same item.
+///
 /// FIFO is made up of a 'get_list_' that BlockingGet() consumes from and a 'put_list_'
 /// that BlockingPut() enqueues into. They are protected by 'get_lock_' and 'put_lock_'
 /// respectively. If both locks need to be held at the same time, 'get_lock_' must be
 /// held before 'put_lock_'. When the 'get_list_' is empty, the caller of BlockingGet()
 /// will atomically swap the 'put_list_' with 'get_list_'. The swapping happens with both
 /// the 'get_lock_' and 'put_lock_' held.
-template <typename T>
+template <typename T, typename ElemBytesFn = ByteLimitDisabledFn<T>>
 class BlockingQueue : public CacheLineAligned {
  public:
-  BlockingQueue(size_t max_elements)
+  BlockingQueue(size_t max_elements, int64_t max_bytes = -1)
     : shutdown_(false),
       max_elements_(max_elements),
       total_put_wait_time_(0),
       get_list_size_(0),
-      total_get_wait_time_(0) {
+      total_get_wait_time_(0),
+      max_bytes_(max_bytes) {
+    DCHECK(max_bytes == -1 || max_bytes > 0) << max_bytes;
     DCHECK_GT(max_elements_, 0);
     // Make sure class members commonly used in BlockingPut() don't alias with class
-    // members used in BlockingGet(). 'pad_' is the point of division.
-    DCHECK_NE(offsetof(BlockingQueue, pad_) / 64,
+    // members used in BlockingGet(). 'put_bytes_enqueued_' is the point of division.
+    DCHECK_NE(offsetof(BlockingQueue, put_bytes_enqueued_) / 64,
         offsetof(BlockingQueue, get_lock_) / 64);
   }
 
@@ -96,11 +114,20 @@ class BlockingQueue : public CacheLineAligned {
     get_list_.pop_front();
     get_list_size_.Store(get_list_.size());
     read_lock.unlock();
+    int64_t val_bytes = ElemBytesFn()(*out);
+    DCHECK_GE(val_bytes, 0);
+    get_bytes_dequeued_.Add(val_bytes);
     // Note that there is a race with any writer if NotifyOne() is called between when
     // a writer checks the queue size and when it calls put_cv_.Wait(). If this race
     // occurs, a writer can stay blocked even if the queue is not full until the next
     // BlockingGet(). The race is benign correctness wise as BlockingGet() will always
     // notify a writer with 'put_lock_' held when both lists are empty.
+    //
+    // Relatedly, if multiple writers hit the bytes limit of the queue and queue elements
+    // vary in size, we may not immediately unblock all writers. E.g. if two writers are
+    // waiting to enqueue elements of N bytes and we dequeue an element of 2N bytes, we
+    // could wake up both writers but actually only wake up one. This is also benign
+    // correctness-wise because we will continue to make progress.
     put_cv_.NotifyOne();
     return true;
   }
@@ -112,9 +139,10 @@ class BlockingQueue : public CacheLineAligned {
   template <typename V>
   bool BlockingPut(V&& val) {
     MonotonicStopWatch timer;
+    int64_t val_bytes = ElemBytesFn()(val);
+    DCHECK_GE(val_bytes, 0);
     boost::unique_lock<boost::mutex> write_lock(put_lock_);
-
-    while (SizeLocked(write_lock) >= max_elements_ && !shutdown_) {
+    while (!HasCapacityInternal(write_lock, val_bytes) && !shutdown_) {
       timer.Start();
       put_cv_.Wait(write_lock);
       timer.Stop();
@@ -123,6 +151,7 @@ class BlockingQueue : public CacheLineAligned {
     if (UNLIKELY(shutdown_)) return false;
 
     DCHECK_LT(put_list_.size(), max_elements_);
+    put_bytes_enqueued_ += val_bytes;
     Put(std::forward<V>(val));
     write_lock.unlock();
     get_cv_.NotifyOne();
@@ -137,11 +166,13 @@ class BlockingQueue : public CacheLineAligned {
   template <typename V>
   bool BlockingPutWithTimeout(V&& val, int64_t timeout_micros) {
     MonotonicStopWatch timer;
+    int64_t val_bytes = ElemBytesFn()(val);
+    DCHECK_GE(val_bytes, 0);
     boost::unique_lock<boost::mutex> write_lock(put_lock_);
     timespec abs_time;
     TimeFromNowMicros(timeout_micros, &abs_time);
     bool notified = true;
-    while (SizeLocked(write_lock) >= max_elements_ && !shutdown_ && notified) {
+    while (!HasCapacityInternal(write_lock, val_bytes) && !shutdown_ && notified) {
       timer.Start();
       // Wait until we're notified or until the timeout expires.
       notified = put_cv_.WaitUntil(write_lock, abs_time);
@@ -152,8 +183,9 @@ class BlockingQueue : public CacheLineAligned {
     // NOTE: We don't check 'notified' here as it appears that pthread condition variables
     // have a weird behavior in which they can return ETIMEDOUT from timed_wait even if
     // another thread did in fact signal
-    if (SizeLocked(write_lock) >= max_elements_ || shutdown_) return false;
+    if (!HasCapacityInternal(write_lock, val_bytes)) return false;
     DCHECK_LT(put_list_.size(), max_elements_);
+    put_bytes_enqueued_ += val_bytes;
     Put(std::forward<V>(val));
     write_lock.unlock();
     get_cv_.NotifyOne();
@@ -203,6 +235,26 @@ class BlockingQueue : public CacheLineAligned {
     return get_list_size_.Load() + put_list_.size();
   }
 
+  /// Return true if the queue has capacity to add one more element with size 'val_bytes'.
+  /// Caller must hold 'put_lock_' via 'lock'.
+  bool HasCapacityInternal(
+      const boost::unique_lock<boost::mutex>& lock, int64_t val_bytes) {
+    DCHECK(lock.mutex() == &put_lock_ && lock.owns_lock());
+    uint32_t size = SizeLocked(lock);
+    if (size >= max_elements_) return false;
+    if (val_bytes == 0 || max_bytes_ == -1 || size == 0) return true;
+
+    // At this point we can enqueue the item if there is sufficient bytes capacity.
+    if (put_bytes_enqueued_ + val_bytes <= max_bytes_) return true;
+
+    // No bytes capacity left - swap over dequeued bytes to account for elements the
+    // consumer has dequeued. All decrementers of 'get_bytes_dequeued_' hold 'put_lock_'
+    // races with other decrementers are impossible.
+    int64_t dequeued = get_bytes_dequeued_.Swap(0);
+    put_bytes_enqueued_ -= dequeued;
+    return put_bytes_enqueued_ + val_bytes <= max_bytes_;
+  }
+
   /// Overloads for inserting an item into the list, depending on whether it should be
   /// moved or copied.
   void Put(const T& val) { put_list_.push_back(val); }
@@ -227,9 +279,10 @@ class BlockingQueue : public CacheLineAligned {
   /// Total amount of time threads blocked in BlockingPut(). Guarded by 'put_lock_'.
   int64_t total_put_wait_time_;
 
-  /// Padding to avoid data structures used in BlockingGet() to share cache lines
-  /// with data structures used in BlockingPut().
-  int64_t pad_;
+  /// Running counter for bytes enqueued, incremented through the producer thread.
+  /// Decremented by transferring value from 'get_bytes_dequeued_'.
+  /// Guarded by 'put_lock_'
+  int64_t put_bytes_enqueued_ = 0;
 
   /// Guards against concurrent access to 'get_list_'.
   mutable boost::mutex get_lock_;
@@ -249,6 +302,14 @@ class BlockingQueue : public CacheLineAligned {
   /// variable doesn't include the time which other threads block waiting for 'get_lock_'.
   int64_t total_get_wait_time_;
 
+  /// Running count of bytes dequeued. Decremented from 'put_bytes_enqueued_' when it
+  /// exceeds the queue capacity. Kept separate from 'put_bytes_enqueued_' so that
+  /// producers and consumers are not updating the same cache line for every put and get.
+  /// Decrementers must hold 'put_lock_'.
+  AtomicInt64 get_bytes_dequeued_{0};
+
+  /// Soft limit on total bytes in queue. -1 if no limit.
+  const int64_t max_bytes_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/tests/common/test_dimensions.py
----------------------------------------------------------------------
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 0460ea7..af1f8e1 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -108,6 +108,11 @@ def create_parquet_dimension(workload):
   return ImpalaTestDimension('table_format',
       TableFormatInfo.create_from_string(dataset, 'parquet/none'))
 
+def create_avro_snappy_dimension(workload):
+  dataset = get_dataset_from_workload(workload)
+  return ImpalaTestDimension('table_format',
+      TableFormatInfo.create_from_string(dataset, 'avro/snap/block'))
+
 # Common sets of values for the exec option vectors
 ALL_BATCH_SIZES = [0]
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3f8375d3/tests/query_test/test_mem_usage_scaling.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mem_usage_scaling.py b/tests/query_test/test_mem_usage_scaling.py
index 12316db..6535feb 100644
--- a/tests/query_test/test_mem_usage_scaling.py
+++ b/tests/query_test/test_mem_usage_scaling.py
@@ -19,6 +19,7 @@ import pytest
 from copy import copy
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.test_dimensions import create_avro_snappy_dimension
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfNotHdfsMinicluster
@@ -300,3 +301,51 @@ class TestTpcdsMemLimitError(TestLowMemoryLimits):
   def test_low_mem_limit_q53(self, vector):
     self.low_memory_limit_test(
         vector, 'tpcds-decimal_v2-q53', self.MIN_MEM_FOR_TPCDS['q53'])
+
+
+@SkipIfNotHdfsMinicluster.tuned_for_minicluster
+class TestScanMemLimit(ImpalaTestSuite):
+  """Targeted test for scan memory limits."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestScanMemLimit, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+    cls.ImpalaTestMatrix.add_dimension(create_avro_snappy_dimension(cls.get_workload()))
+
+  def test_wide_avro_mem_usage(self, vector, unique_database):
+    """Create a wide avro table with large strings and test scans that can cause OOM."""
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip("only run resource-intensive query on exhaustive")
+    NUM_COLS = 250
+    NUM_ROWS = 50000
+    TBL = "wide_250_cols"
+    # This query caused OOM with the below memory limit before the IMPALA-7296 fix.
+    # When the sort starts to spill it causes row batches to accumulate rapidly in the
+    # scan node's queue.
+    SELECT_QUERY = """select * from {0}.{1} order by col224 limit 100""".format(
+        unique_database, TBL)
+    # Use disable_outermost_topn to enable spilling sort but prevent returning excessive
+    # rows. Limit NUM_SCANNER_THREADS to avoid higher memory consumption on systems with
+    # many cores (each scanner thread uses some memory in addition to the queued memory).
+    SELECT_OPTIONS = {
+        'mem_limit': "256MB", 'disable_outermost_topn': True, "NUM_SCANNER_THREADS": 1}
+    self.execute_query_expect_success(self.client,
+        "create table {0}.{1} ({2}) stored as avro".format(unique_database, TBL,
+         ",".join(["col{0} STRING".format(i) for i in range(NUM_COLS)])))
+    self.run_stmt_in_hive("""
+        SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
+        SET mapred.output.compression.type=BLOCK;
+        SET hive.exec.compress.output=true;
+        SET avro.output.codec=snappy;
+        insert into {0}.{1} select {2} from tpch_parquet.lineitem
+        limit {3}
+        """.format(unique_database, TBL, ','.join(['l_comment'] * NUM_COLS), NUM_ROWS))
+    self.execute_query_expect_success(self.client,
+        "refresh {0}.{1}".format(unique_database, TBL))
+
+    self.execute_query_expect_success(self.client, SELECT_QUERY, SELECT_OPTIONS)