You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/04/24 22:48:25 UTC

[impala] branch master updated (4386c1b -> 58273ff)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 4386c1b  IMPALA-9677: Fix frontend tests using a non-existent S3 bucket
     new 704ba38  IMPALA-9647: Exclude fluent-hc-4.3.2.jar from fe/pom.xml
     new b2d9901  IMPALA-9176: shared null-aware anti-join build
     new 58273ff  IMPALA-9609: Minimize Frontend activity in executor only impalads

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/blocking-join-node.cc                  |  15 -
 be/src/exec/blocking-join-node.h                   |   3 +-
 .../blocking-join-node.inline.h}                   |  30 +-
 be/src/exec/data-source-scan-node.cc               |   1 +
 be/src/exec/exec-node.cc                           |   8 -
 be/src/exec/exec-node.h                            |   2 +-
 .../flat_buffer.h => exec/exec-node.inline.h}      |  22 +-
 be/src/exec/grouping-aggregator-partition.cc       |   2 +-
 be/src/exec/grouping-aggregator.cc                 |   1 +
 be/src/exec/grouping-aggregator.h                  |   3 +-
 be/src/exec/hbase-scan-node.cc                     |   1 +
 be/src/exec/hdfs-avro-scanner-ir.cc                |   1 +
 be/src/exec/hdfs-columnar-scanner-ir.cc            |   1 +
 be/src/exec/hdfs-orc-scanner.cc                    |   1 +
 be/src/exec/hdfs-rcfile-scanner.cc                 |   1 +
 be/src/exec/hdfs-scanner.cc                        |   3 +-
 be/src/exec/hdfs-scanner.h                         |   1 +
 be/src/exec/hdfs-text-scanner.cc                   |   1 +
 be/src/exec/kudu-scanner.cc                        |   1 +
 be/src/exec/nested-loop-join-node.cc               |   2 +
 be/src/exec/non-grouping-aggregator.cc             |   1 +
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |   1 +
 be/src/exec/partitioned-hash-join-builder.cc       |  75 +++-
 be/src/exec/partitioned-hash-join-builder.h        |  40 ++-
 be/src/exec/partitioned-hash-join-node-ir.cc       |   2 +
 be/src/exec/partitioned-hash-join-node.cc          | 145 +++-----
 be/src/exec/partitioned-hash-join-node.h           |  15 +-
 be/src/exec/select-node-ir.cc                      |   2 +
 be/src/exec/unnest-node.cc                         |   1 +
 be/src/runtime/buffered-tuple-stream-test.cc       |  94 ++++-
 be/src/runtime/buffered-tuple-stream.cc            | 397 +++++++++++----------
 be/src/runtime/buffered-tuple-stream.h             | 267 ++++++++++----
 be/src/runtime/buffered-tuple-stream.inline.h      |  36 ++
 be/src/runtime/bufferpool/buffer-pool-internal.h   |   8 +-
 be/src/runtime/bufferpool/buffer-pool-test.cc      |   2 +-
 be/src/runtime/bufferpool/buffer-pool.cc           |  24 +-
 be/src/runtime/bufferpool/buffer-pool.h            |  17 +-
 be/src/runtime/exec-env.cc                         |  11 +-
 be/src/util/debug-util.cc                          |   1 +
 be/src/util/debug-util.h                           |   1 +
 common/thrift/generate_error_codes.py              |   5 +
 fe/pom.xml                                         |   7 +
 .../org/apache/impala/planner/HashJoinNode.java    |   1 -
 .../java/org/apache/impala/planner/JoinNode.java   |   2 -
 .../java/org/apache/impala/service/Frontend.java   |   7 -
 .../org/apache/impala/service/JniFrontend.java     |  44 ++-
 .../queries/PlannerTest/tpch-all.test              |   4 +-
 .../spilling-naaj-no-deny-reservation.test         |   1 +
 .../QueryTest/spilling-no-debug-action.test        |   3 +-
 tests/custom_cluster/test_coordinators.py          |  18 +
 tests/query_test/test_spilling.py                  |   2 +-
 51 files changed, 858 insertions(+), 476 deletions(-)
 copy be/src/{runtime/string-value.cc => exec/blocking-join-node.inline.h} (53%)
 copy be/src/{util/flat_buffer.h => exec/exec-node.inline.h} (64%)


[impala] 01/03: IMPALA-9647: Exclude fluent-hc-4.3.2.jar from fe/pom.xml

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 704ba38ae1b23d34829cf9a9b77f18b0b0235234
Author: David Knupp <dk...@cloudera.com>
AuthorDate: Mon Apr 20 10:17:29 2020 -0700

    IMPALA-9647: Exclude fluent-hc-4.3.2.jar from fe/pom.xml
    
    fluent-hc has been flagged in CVE-2014-3577 and CVE-2015-5262, and
    doesn't seem to be a required component for us.
    
    Tested first by confirming that the following line no longer shows
    up in the output of mvn dependency:tree after the change:
    
      +- org.apache.httpcomponents:fluent-hc:jar:4.3.2:compile
    
    Then confirmed that upstream tests on jenkins.impala.io pass.
    
    Change-Id: I245ddb50e17d49295b1a1ebaef6c17e1b92304f1
    Reviewed-on: http://gerrit.cloudera.org:8080/15760
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/pom.xml | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/fe/pom.xml b/fe/pom.xml
index d29c525..61e26a2 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -142,6 +142,11 @@ under the License.
       <version>${hudi.version}</version>
         <exclusions>
           <exclusion>
+            <!-- IMPALA-9647 (re: CVE-2014-3577, CVE-2015-5262) -->
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>fluent-hc</artifactId>
+          </exclusion>
+          <exclusion>
             <groupId>org.apache.parquet</groupId>
             <artifactId>parquet-avro</artifactId>
           </exclusion>
@@ -751,6 +756,8 @@ under the License.
                     <exclude>org.apache.logging.log4j:log4j-1.2-api</exclude>
                     <!-- IMPALA-9108: Avoid pulling in leveldbjni, which is unneeded. -->
                     <exclude>org.fusesource.leveldbjni:*</exclude>
+                    <!-- IMPALA-9647 (re: CVE-2014-3577, CVE-2015-5262) -->
+                    <exclude>org.apache.httpcomponents:fluent-hc</exclude>
                     <!-- Assert that we only use artifacts from only the specified
                          version of these components. -->
                     <exclude>org.apache.hadoop:*</exclude>


[impala] 03/03: IMPALA-9609: Minimize Frontend activity in executor only impalads

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 58273fff601dcc763ac43f7cc275a174a2e18b6b
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Fri Apr 24 00:50:39 2020 +0200

    IMPALA-9609: Minimize Frontend activity in executor only impalads
    
    Until now the Frontend started fully regardless of flag is_coordinator,
    e.g. created connections to the HMS, which is both error prone and can
    DoS the metastore. (note that even coordinators started to connect to
    HMS only in the recent past, related to local catalog mode and ACID
    transactions)
    
    Executor only impalads still need a JVM as queries can contain
    java calls (HDFS/Hbase API calls, Hive UDFs), but most of the JNI API
    provided by JniFrontend shouldn't be called by executors. It seems
    that the whole Frontend object is needed only by coordinators.
    
    Testing:
    - generally executor only mode doesn't seem to be well covered
    - ran test_coordinators.py which has some tests with executor only
      impalads
    - added new test for HBase tables (Hive UDFs and HDFS were already
      covered)
    
    Change-Id: I4627e5e3520175153cb49e24fd480815dfefdae1
    Reviewed-on: http://gerrit.cloudera.org:8080/15793
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/exec-env.cc                         | 11 ++++--
 .../java/org/apache/impala/service/Frontend.java   |  7 ----
 .../org/apache/impala/service/JniFrontend.java     | 44 +++++++++++++++++++---
 tests/custom_cluster/test_coordinators.py          | 18 +++++++++
 4 files changed, 63 insertions(+), 17 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index a7bbaf7..c6e7844 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -284,6 +284,7 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
   cluster_membership_mgr_.reset(new ClusterMembershipMgr(
       PrintId(backend_id_), statestore_subscriber_.get(), metrics_.get()));
 
+  // TODO: Consider removing AdmissionController from executor only impalads.
   admission_controller_.reset(
       new AdmissionController(cluster_membership_mgr_.get(), statestore_subscriber_.get(),
           request_pool_service_.get(), metrics_.get(), configured_backend_address_));
@@ -430,10 +431,12 @@ Status ExecEnv::Init() {
   }
 
   RETURN_IF_ERROR(cluster_membership_mgr_->Init());
-  cluster_membership_mgr_->RegisterUpdateCallbackFn(
-      [this](ClusterMembershipMgr::SnapshotPtr snapshot) {
-        SendClusterMembershipToFrontend(snapshot, this->frontend());
-      });
+  if (FLAGS_is_coordinator) {
+    cluster_membership_mgr_->RegisterUpdateCallbackFn(
+        [this](ClusterMembershipMgr::SnapshotPtr snapshot) {
+          SendClusterMembershipToFrontend(snapshot, this->frontend());
+        });
+  }
 
   RETURN_IF_ERROR(admission_controller_->Init());
 
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index fb7ca82..1715233 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -372,13 +372,6 @@ public class Frontend {
   }
 
   /**
-   * Update the cluster membership snapshot with the latest snapshot from the backend.
-   */
-  public void updateExecutorMembership(TUpdateExecutorMembershipRequest req) {
-    ExecutorMembershipSnapshot.update(req);
-  }
-
-  /**
    * Constructs a TCatalogOpRequest and attaches it, plus any metadata, to the
    * result argument.
    */
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 2d47268..b572f8a 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -95,6 +95,7 @@ import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateExecutorMembershipRequest;
 import org.apache.impala.util.AuthorizationUtil;
+import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.apache.impala.util.GlogAppender;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.TSessionStateUtil;
@@ -134,11 +135,16 @@ public class JniFrontend {
 
     GlogAppender.Install(TLogLevel.values()[cfg.impala_log_lvl],
         TLogLevel.values()[cfg.non_impala_java_vlog]);
-
-    final AuthorizationFactory authzFactory =
-        AuthorizationUtil.authzFactoryFrom(BackendConfig.INSTANCE);
     LOG.info(JniUtil.getJavaVersion());
-    frontend_ = new Frontend(authzFactory);
+
+    if (cfg.is_coordinator) {
+      final AuthorizationFactory authzFactory =
+          AuthorizationUtil.authzFactoryFrom(BackendConfig.INSTANCE);
+      frontend_ = new Frontend(authzFactory);
+    } else {
+      // Avoid instantiating Frontend in executor only impalads.
+      frontend_ = null;
+    }
   }
 
   /**
@@ -147,6 +153,7 @@ public class JniFrontend {
    */
   public byte[] createExecRequest(byte[] thriftQueryContext)
       throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TQueryCtx queryCtx = new TQueryCtx();
     JniUtil.deserializeThrift(protocolFactory_, queryCtx, thriftQueryContext);
 
@@ -168,6 +175,7 @@ public class JniFrontend {
 
   // Deserialize and merge each thrift catalog update into a single merged update
   public byte[] updateCatalogCache(byte[] req) throws ImpalaException, TException {
+    Preconditions.checkNotNull(frontend_);
     TUpdateCatalogCacheRequest request = new TUpdateCatalogCacheRequest();
     JniUtil.deserializeThrift(protocolFactory_, request, req);
     return new TSerializer(protocolFactory_).serialize(
@@ -182,7 +190,7 @@ public class JniFrontend {
       throws ImpalaException {
     TUpdateExecutorMembershipRequest req = new TUpdateExecutorMembershipRequest();
     JniUtil.deserializeThrift(protocolFactory_, req, thriftMembershipUpdate);
-    frontend_.updateExecutorMembership(req);
+    ExecutorMembershipSnapshot.update(req);
   }
 
   /**
@@ -193,6 +201,7 @@ public class JniFrontend {
    */
   public byte[] loadTableData(byte[] thriftLoadTableDataParams)
       throws ImpalaException, IOException {
+    Preconditions.checkNotNull(frontend_);
     TLoadDataReq request = new TLoadDataReq();
     JniUtil.deserializeThrift(protocolFactory_, request, thriftLoadTableDataParams);
     TLoadDataResp response = frontend_.loadTableData(request);
@@ -209,6 +218,7 @@ public class JniFrontend {
    * This call is thread-safe.
    */
   public String getExplainPlan(byte[] thriftQueryContext) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TQueryCtx queryCtx = new TQueryCtx();
     JniUtil.deserializeThrift(protocolFactory_, queryCtx, thriftQueryContext);
     String plan = frontend_.getExplainString(queryCtx);
@@ -217,6 +227,7 @@ public class JniFrontend {
   }
 
   public byte[] getCatalogMetrics() throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TGetCatalogMetricsResult metrics = frontend_.getCatalogMetrics();
     TSerializer serializer = new TSerializer(protocolFactory_);
     try {
@@ -240,6 +251,7 @@ public class JniFrontend {
    * @see Frontend#getTableNames
    */
   public byte[] getTableNames(byte[] thriftGetTablesParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TGetTablesParams params = new TGetTablesParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftGetTablesParams);
     // If the session was not set it indicates this is an internal Impala call.
@@ -269,6 +281,7 @@ public class JniFrontend {
    * @see Frontend#getTableFiles
    */
   public byte[] getTableFiles(byte[] thriftShowFilesParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TShowFilesParams params = new TShowFilesParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftShowFilesParams);
     TResultSet result = frontend_.getTableFiles(params);
@@ -291,6 +304,7 @@ public class JniFrontend {
    * @see Frontend#getDbs
    */
   public byte[] getDbs(byte[] thriftGetTablesParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TGetDbsParams params = new TGetDbsParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftGetTablesParams);
     // If the session was not set it indicates this is an internal Impala call.
@@ -318,6 +332,7 @@ public class JniFrontend {
    * @see Frontend#getDataSrcs
    */
   public byte[] getDataSrcMetadata(byte[] thriftParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TGetDataSrcsParams params = new TGetDataSrcsParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftParams);
 
@@ -342,6 +357,7 @@ public class JniFrontend {
   }
 
   public byte[] getStats(byte[] thriftShowStatsParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TShowStatsParams params = new TShowStatsParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftShowStatsParams);
     Preconditions.checkState(params.isSetTable_name());
@@ -369,6 +385,7 @@ public class JniFrontend {
    * @see Frontend#getTableNames
    */
   public byte[] getFunctions(byte[] thriftGetFunctionsParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TGetFunctionsParams params = new TGetFunctionsParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftGetFunctionsParams);
 
@@ -402,6 +419,7 @@ public class JniFrontend {
    */
   public byte[] getCatalogObject(byte[] thriftParams) throws ImpalaException,
       TException {
+    Preconditions.checkNotNull(frontend_);
     TCatalogObject objectDescription = new TCatalogObject();
     JniUtil.deserializeThrift(protocolFactory_, objectDescription, thriftParams);
     TSerializer serializer = new TSerializer(protocolFactory_);
@@ -416,6 +434,7 @@ public class JniFrontend {
    * @see Frontend#describeDb
    */
   public byte[] describeDb(byte[] thriftDescribeDbParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TDescribeDbParams params = new TDescribeDbParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftDescribeDbParams);
 
@@ -437,6 +456,7 @@ public class JniFrontend {
    * @see Frontend#describeTable
    */
   public byte[] describeTable(byte[] thriftDescribeTableParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TDescribeTableParams params = new TDescribeTableParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftDescribeTableParams);
 
@@ -464,6 +484,7 @@ public class JniFrontend {
    */
   public String showCreateTable(byte[] thriftTableName)
       throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TTableName params = new TTableName();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftTableName);
     return ToSqlUtils.getCreateTableSql(frontend_.getCatalog().getTable(
@@ -475,6 +496,7 @@ public class JniFrontend {
    */
   public String showCreateFunction(byte[] thriftShowCreateFunctionParams)
       throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TGetFunctionsParams params = new TGetFunctionsParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftShowCreateFunctionParams);
     Preconditions.checkArgument(params.category == TFunctionCategory.SCALAR ||
@@ -506,6 +528,7 @@ public class JniFrontend {
    * Gets all roles.
    */
   public byte[] getRoles(byte[] showRolesParams) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TShowRolesParams params = new TShowRolesParams();
     JniUtil.deserializeThrift(protocolFactory_, params, showRolesParams);
     TSerializer serializer = new TSerializer(protocolFactory_);
@@ -521,6 +544,7 @@ public class JniFrontend {
    */
   public byte[] getPrincipalPrivileges(byte[] showGrantPrincipalParams)
       throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TShowGrantPrincipalParams params = new TShowGrantPrincipalParams();
     JniUtil.deserializeThrift(protocolFactory_, params, showGrantPrincipalParams);
     TSerializer serializer = new TSerializer(protocolFactory_);
@@ -536,6 +560,7 @@ public class JniFrontend {
    */
   public byte[] execHiveServer2MetadataOp(byte[] metadataOpsParams)
       throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     TMetadataOpRequest params = new TMetadataOpRequest();
     JniUtil.deserializeThrift(protocolFactory_, params, metadataOpsParams);
     TResultSet result = frontend_.execHiveServer2MetadataOp(params);
@@ -549,10 +574,14 @@ public class JniFrontend {
   }
 
   public void setCatalogIsReady() {
+    Preconditions.checkNotNull(frontend_);
     frontend_.getCatalog().setIsReady(true);
   }
 
-  public void waitForCatalog() { frontend_.waitForCatalog(); }
+  public void waitForCatalog() {
+    Preconditions.checkNotNull(frontend_);
+    frontend_.waitForCatalog();
+  }
 
   // Caching this saves ~50ms per call to getHadoopConfigAsHtml
   private static final Configuration CONF = new Configuration();
@@ -629,6 +658,7 @@ public class JniFrontend {
    * @param serializedRequest
    */
   public void callQueryCompleteHooks(byte[] serializedRequest) throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
     final TQueryCompleteContext request = new TQueryCompleteContext();
     JniUtil.deserializeThrift(protocolFactory_, request, serializedRequest);
 
@@ -643,6 +673,7 @@ public class JniFrontend {
    * @throws TransactionException
    */
   public void abortTransaction(long transactionId) throws TransactionException {
+    Preconditions.checkNotNull(frontend_);
     this.frontend_.abortTransaction(transactionId);
   }
 
@@ -651,6 +682,7 @@ public class JniFrontend {
    * @param transactionId the id of the transaction to clear.
    */
   public void unregisterTransaction(long transactionId) {
+    Preconditions.checkNotNull(frontend_);
     this.frontend_.unregisterTransaction(transactionId);
   }
 
diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py
index 613fcf9..b63b1f7 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -24,6 +24,8 @@ import time
 from subprocess import check_call
 from tests.util.filesystem_utils import get_fs_path
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import (SkipIf, SkipIfS3, SkipIfABFS, SkipIfADLS,
+    SkipIfIsilon, SkipIfLocal)
 
 LOG = logging.getLogger('test_coordinators')
 LOG.setLevel(level=logging.DEBUG)
@@ -310,3 +312,19 @@ class TestCoordinators(CustomClusterTestSuite):
                          "functional.alltypes b on a.id = b.id;")
     num_hosts = "hosts=10 instances=10"
     assert num_hosts in str(ret)
+
+  @SkipIfS3.hbase
+  @SkipIfABFS.hbase
+  @SkipIfADLS.hbase
+  @SkipIfIsilon.hbase
+  @SkipIfLocal.hbase
+  @SkipIf.skip_hbase
+  @pytest.mark.execute_serially
+  def test_executor_only_hbase(self):
+    """Verifies HBase tables can be scanned by executor only impalads."""
+    self._start_impala_cluster([], cluster_size=3, num_coordinators=1,
+                         use_exclusive_coordinators=True)
+    client = self.cluster.impalads[0].service.create_beeswax_client()
+    query = "select count(*) from functional_hbase.alltypes"
+    result = self.execute_query_expect_success(client, query)
+    assert result.data == ['7300']


[impala] 02/03: IMPALA-9176: shared null-aware anti-join build

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b2d9901fb88703667c59aed73193196947be521d
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Thu Mar 26 15:09:49 2020 -0700

    IMPALA-9176: shared null-aware anti-join build
    
    This switches null-aware anti-join (NAAJ) to use shared
    join builds with mt_dop > 0. To support this, we
    make all access to the join build data structures
    from the probe read-only. NAAJ requires iterating
    over rows from build partitions at various steps
    in the algorithm and before this patch this was not
    thread-safe. We avoided that problem by having a
    separate builder for each join node and duplicating
    the data.
    
    The main challenge was iteration over
    null_aware_partition()->build_rows() from the probe
    side, because it uses an embedded iterator in the
    stream so was not thread-safe (since each thread
    would be trying to use the same iterator).
    
    The solution is to extend BufferedTupleStream to
    allow multiple read iterators into a pinned,
    read-only, stream. Each probe thread can then
    iterate over the stream independently with no
    thread safety issues.
    
    With BufferedTupleStream changes, I partially abstracted
    ReadIterator more from the rest of BufferedTupleStream,
    but decided not to completely refactor so that this patchset
    didn't cause excessive churn. I.e. much BufferedTupleStream
    code still accesses internal fields of ReadIterator.
    
    Fix a pre-existing bug in grouping-aggregator where
    Spill() hit a DCHECK because the hash table was
    destroyed unnecessarily when it hit an OOM. This was
    flushed out by the parameter change in test_spilling.
    
    Testing:
    Add test to buffered-tuple-stream-test for multiple readers
    to BTS.
    
    Tweaked test_spilling_naaj_no_deny_reservation to have
    a smaller minimum reservation, required to keep the
    test passing with the new, lower, memory requirement.
    
    Updated a TPC-H planner test where resource requirements
    slightly decreased for the NAAJ.
    
    Ran the naaj tests in test_spilling.py with TSAN enabled,
    confirmed no data races.
    
    Ran exhaustive tests, which passed after fixing IMPALA-9611.
    
    Ran core tests with ASAN.
    
    Ran backend tests with TSAN.
    
    Perf:
    I ran this query that exercises EvaluateNullProbe() heavily.
    
      select l_orderkey, l_partkey, l_suppkey, l_linenumber
      from tpch30_parquet.lineitem
      where l_suppkey = 4162 and l_shipmode = 'AIR'
            and l_returnflag = 'A' and l_shipdate > '1993-01-01'
            and if(l_orderkey > 5500000, NULL, l_orderkey) not in (
              select if(o_orderkey % 2 = 0, NULL, o_orderkey + 1)
              from orders
              where l_orderkey = o_orderkey)
      order by 1,2,3,4;
    
    It went from ~13s to ~11s running on a single impalad with
    this change, because of the inlining of CreateOutputRow() and
    EvalConjuncts().
    
    I also ran TPC-H SF 30 on Parquet with mt_dop=4, and there was
    no change in performance.
    
    Change-Id: I95ead761430b0aa59a4fb2e7848e47d1bf73c1c9
    Reviewed-on: http://gerrit.cloudera.org:8080/15612
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/blocking-join-node.cc                  |  15 -
 be/src/exec/blocking-join-node.h                   |   3 +-
 be/src/exec/blocking-join-node.inline.h            |  43 +++
 be/src/exec/data-source-scan-node.cc               |   1 +
 be/src/exec/exec-node.cc                           |   8 -
 be/src/exec/exec-node.h                            |   2 +-
 be/src/exec/exec-node.inline.h                     |  35 ++
 be/src/exec/grouping-aggregator-partition.cc       |   2 +-
 be/src/exec/grouping-aggregator.cc                 |   1 +
 be/src/exec/grouping-aggregator.h                  |   3 +-
 be/src/exec/hbase-scan-node.cc                     |   1 +
 be/src/exec/hdfs-avro-scanner-ir.cc                |   1 +
 be/src/exec/hdfs-columnar-scanner-ir.cc            |   1 +
 be/src/exec/hdfs-orc-scanner.cc                    |   1 +
 be/src/exec/hdfs-rcfile-scanner.cc                 |   1 +
 be/src/exec/hdfs-scanner.cc                        |   3 +-
 be/src/exec/hdfs-scanner.h                         |   1 +
 be/src/exec/hdfs-text-scanner.cc                   |   1 +
 be/src/exec/kudu-scanner.cc                        |   1 +
 be/src/exec/nested-loop-join-node.cc               |   2 +
 be/src/exec/non-grouping-aggregator.cc             |   1 +
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |   1 +
 be/src/exec/partitioned-hash-join-builder.cc       |  75 +++-
 be/src/exec/partitioned-hash-join-builder.h        |  40 ++-
 be/src/exec/partitioned-hash-join-node-ir.cc       |   2 +
 be/src/exec/partitioned-hash-join-node.cc          | 145 +++-----
 be/src/exec/partitioned-hash-join-node.h           |  15 +-
 be/src/exec/select-node-ir.cc                      |   2 +
 be/src/exec/unnest-node.cc                         |   1 +
 be/src/runtime/buffered-tuple-stream-test.cc       |  94 ++++-
 be/src/runtime/buffered-tuple-stream.cc            | 397 +++++++++++----------
 be/src/runtime/buffered-tuple-stream.h             | 267 ++++++++++----
 be/src/runtime/buffered-tuple-stream.inline.h      |  36 ++
 be/src/runtime/bufferpool/buffer-pool-internal.h   |   8 +-
 be/src/runtime/bufferpool/buffer-pool-test.cc      |   2 +-
 be/src/runtime/bufferpool/buffer-pool.cc           |  24 +-
 be/src/runtime/bufferpool/buffer-pool.h            |  17 +-
 be/src/util/debug-util.cc                          |   1 +
 be/src/util/debug-util.h                           |   1 +
 common/thrift/generate_error_codes.py              |   5 +
 .../org/apache/impala/planner/HashJoinNode.java    |   1 -
 .../java/org/apache/impala/planner/JoinNode.java   |   2 -
 .../queries/PlannerTest/tpch-all.test              |   4 +-
 .../spilling-naaj-no-deny-reservation.test         |   1 +
 .../QueryTest/spilling-no-debug-action.test        |   3 +-
 tests/query_test/test_spilling.py                  |   2 +-
 46 files changed, 834 insertions(+), 439 deletions(-)

diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 7447c93..320e5d4 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -389,18 +389,3 @@ int64_t BlockingJoinNode::LocalTimeCounterFn(const RuntimeProfile::Counter* tota
   // Don't return a negative number in those cases.
   return ::max<int64_t>(0, local_time);
 }
-
-// This function is replaced by codegen
-void BlockingJoinNode::CreateOutputRow(TupleRow* out, TupleRow* probe, TupleRow* build) {
-  uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out);
-  if (probe == NULL) {
-    memset(out_ptr, 0, probe_tuple_row_size_);
-  } else {
-    memcpy(out_ptr, probe, probe_tuple_row_size_);
-  }
-  if (build == NULL) {
-    memset(out_ptr + probe_tuple_row_size_, 0, build_tuple_row_size_);
-  } else {
-    memcpy(out_ptr + probe_tuple_row_size_, build, build_tuple_row_size_);
-  }
-}
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index eb4f53d..135cd8e 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -197,7 +197,8 @@ class BlockingJoinNode : public ExecNode {
   /// Write combined row, consisting of the left child's 'probe_row' and right child's
   /// 'build_row' to 'out_row'.
   /// This is replaced by codegen.
-  void CreateOutputRow(TupleRow* out_row, TupleRow* probe_row, TupleRow* build_row);
+  inline void CreateOutputRow(
+      TupleRow* out_row, TupleRow* probe_row, TupleRow* build_row);
 
   /// This function calculates the "local time" spent in the join node.
   ///
diff --git a/be/src/exec/blocking-join-node.inline.h b/be/src/exec/blocking-join-node.inline.h
new file mode 100644
index 0000000..f283878
--- /dev/null
+++ b/be/src/exec/blocking-join-node.inline.h
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstring>
+
+#include "codegen/impala-ir.h"
+#include "exec/blocking-join-node.h"
+
+namespace impala {
+
+// This function is replaced by codegen.
+// It is an inline function to reduce function call overhead for interpreted code.
+inline void IR_NO_INLINE BlockingJoinNode::CreateOutputRow(
+    TupleRow* out_row, TupleRow* probe_row, TupleRow* build_row) {
+  uint8_t* out_ptr = reinterpret_cast<uint8_t*>(out_row);
+  if (probe_row == nullptr) {
+    memset(out_ptr, 0, probe_tuple_row_size_);
+  } else {
+    memcpy(out_ptr, probe_row, probe_tuple_row_size_);
+  }
+  if (build_row == nullptr) {
+    memset(out_ptr + probe_tuple_row_size_, 0, build_tuple_row_size_);
+  } else {
+    memcpy(out_ptr + probe_tuple_row_size_, build_row, build_tuple_row_size_);
+  }
+}
+}
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index f4622d6..d2601e7 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -20,6 +20,7 @@
 #include <vector>
 #include <gutil/strings/substitute.h>
 
+#include "exec/exec-node.inline.h"
 #include "exec/exec-node-util.h"
 #include "exec/parquet/parquet-common.h"
 #include "exec/read-write-util.h"
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index d128631..d4a7d91 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -478,14 +478,6 @@ bool ExecNode::CheckLimitAndTruncateRowBatchIfNeededShared(
   return reached_limit;
 }
 
-bool ExecNode::EvalConjuncts(
-    ScalarExprEvaluator* const* evals, int num_conjuncts, TupleRow* row) {
-  for (int i = 0; i < num_conjuncts; ++i) {
-    if (!evals[i]->EvalPredicate(row)) return false;
-  }
-  return true;
-}
-
 Status ExecNode::QueryMaintenance(RuntimeState* state) {
   expr_results_pool_->Clear();
   return state->CheckQueryState();
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 998e19f..0976907 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -236,7 +236,7 @@ class ExecNode {
 
   /// Evaluate the conjuncts in 'evaluators' over 'row'.
   /// Returns true if all exprs return true.
-  static bool EvalConjuncts(
+  static inline bool EvalConjuncts(
       ScalarExprEvaluator* const* evals, int num_conjuncts, TupleRow* row);
 
   /// Codegen EvalConjuncts(). Returns a non-OK status if the function couldn't be
diff --git a/be/src/exec/exec-node.inline.h b/be/src/exec/exec-node.inline.h
new file mode 100644
index 0000000..b5edb8c
--- /dev/null
+++ b/be/src/exec/exec-node.inline.h
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "codegen/impala-ir.h"
+#include "exec/exec-node.h"
+#include "exprs/scalar-expr-evaluator.h"
+
+namespace impala {
+
+// This function is replaced by codegen.
+// It is an inline function to reduce function call overhead for interpreted code.
+inline bool IR_NO_INLINE ExecNode::EvalConjuncts(
+    ScalarExprEvaluator* const* evals, int num_conjuncts, TupleRow* row) {
+  for (int i = 0; i < num_conjuncts; ++i) {
+    if (!evals[i]->EvalPredicate(row)) return false;
+  }
+  return true;
+}
+}
diff --git a/be/src/exec/grouping-aggregator-partition.cc b/be/src/exec/grouping-aggregator-partition.cc
index 10b24c5..784795d 100644
--- a/be/src/exec/grouping-aggregator-partition.cc
+++ b/be/src/exec/grouping-aggregator-partition.cc
@@ -90,7 +90,7 @@ Status GroupingAggregator::Partition::InitHashTable(bool* got_memory) {
   // Please update the error message in CreateHashPartitions() if initial size of
   // hash table changes.
   Status status = hash_tbl->Init(got_memory);
-  if (!status.ok() || !(*got_memory)) {
+  if (!status.ok()) {
     hash_tbl->Close();
     hash_tbl.reset();
   }
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index a66ed62..b73b6df 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -21,6 +21,7 @@
 
 #include "codegen/llvm-codegen.h"
 #include "exec/exec-node.h"
+#include "exec/exec-node.inline.h"
 #include "exec/hash-table.inline.h"
 #include "exprs/agg-fn-evaluator.h"
 #include "exprs/scalar-expr.h"
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index ffef305..6687950 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -423,8 +423,7 @@ class GroupingAggregator : public Aggregator {
 
     /// Initializes the hash table. 'aggregated_row_stream' must be non-NULL.
     /// Sets 'got_memory' to true if the hash table was initialised or false on OOM.
-    /// After returning, 'hash_tbl' will be non-null iff 'got_memory' is true and the
-    /// returned status is OK.
+    /// After returning, 'hash_tbl' will be non-null iff the returned status is OK.
     Status InitHashTable(bool* got_memory) WARN_UNUSED_RESULT;
 
     /// Called in case we need to serialize aggregated rows. This step effectively does
diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index 9ec5170..b00929d 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 
+#include "exec/exec-node.inline.h"
 #include "exec/exec-node-util.h"
 #include "exec/text-converter.inline.h"
 #include "gen-cpp/PlanNodes_types.h"
diff --git a/be/src/exec/hdfs-avro-scanner-ir.cc b/be/src/exec/hdfs-avro-scanner-ir.cc
index 5e2599d..d61e135 100644
--- a/be/src/exec/hdfs-avro-scanner-ir.cc
+++ b/be/src/exec/hdfs-avro-scanner-ir.cc
@@ -18,6 +18,7 @@
 #include <algorithm>
 #include <limits>
 
+#include "exec/exec-node.inline.h"
 #include "exec/hdfs-avro-scanner.h"
 #include "exec/read-write-util.h"
 #include "runtime/date-value.h"
diff --git a/be/src/exec/hdfs-columnar-scanner-ir.cc b/be/src/exec/hdfs-columnar-scanner-ir.cc
index 2c2a6a6..a9bd48f 100644
--- a/be/src/exec/hdfs-columnar-scanner-ir.cc
+++ b/be/src/exec/hdfs-columnar-scanner-ir.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "exec/exec-node.inline.h"
 #include "exec/hdfs-columnar-scanner.h"
 #include "runtime/row-batch.h"
 #include "exec/scratch-tuple-batch.h"
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index 7967cb2..81df066 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -19,6 +19,7 @@
 
 #include <queue>
 
+#include "exec/exec-node.inline.h"
 #include "exec/orc-column-readers.h"
 #include "exec/scanner-context.inline.h"
 #include "exec/scratch-tuple-batch.h"
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index dba8f51..b59da3a 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -19,6 +19,7 @@
 
 #include <boost/algorithm/string.hpp>
 
+#include "exec/exec-node.inline.h"
 #include "exec/hdfs-scan-node.h"
 #include "exec/hdfs-sequence-scanner.h"
 #include "exec/scanner-context.inline.h"
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index aa37ffa..5b21c3e 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -19,10 +19,11 @@
 
 #include "codegen/codegen-anyval.h"
 #include "exec/base-sequence-scanner.h"
-#include "exec/text-converter.h"
+#include "exec/exec-node.inline.h"
 #include "exec/hdfs-scan-node.h"
 #include "exec/hdfs-scan-node-mt.h"
 #include "exec/read-write-util.h"
+#include "exec/text-converter.h"
 #include "exec/text-converter.inline.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "runtime/collection-value-builder.h"
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 18cd84b..68b413b 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -28,6 +28,7 @@
 #include "common/global-flags.h"
 #include "common/object-pool.h"
 #include "common/status.h"
+#include "exec/exec-node.inline.h"
 #include "exec/hdfs-scan-node-base.h"
 #include "exec/scanner-context.h"
 #include "runtime/io/disk-io-mgr.h"
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 78355dd..9ae980e 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -31,6 +31,7 @@
 #include "common/logging.h"
 #include "exec/delimited-text-parser.h"
 #include "exec/delimited-text-parser.inline.h"
+#include "exec/exec-node.inline.h"
 #include "exec/hdfs-plugin-text-scanner.h"
 #include "exec/hdfs-scan-node-base.h"
 #include "exec/hdfs-scan-node.h"
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index b3a5e68..f2e9eea 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -23,6 +23,7 @@
 #include <kudu/client/value.h>
 #include <thrift/protocol/TDebugProtocol.h>
 
+#include "exec/exec-node.inline.h"
 #include "exec/kudu-util.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 185e241..4804fc2 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -21,6 +21,8 @@
 #include <gutil/strings/substitute.h>
 
 #include "common/names.h"
+#include "exec/blocking-join-node.inline.h"
+#include "exec/exec-node.inline.h"
 #include "exec/exec-node-util.h"
 #include "exec/join-op.h"
 #include "exprs/scalar-expr-evaluator.h"
diff --git a/be/src/exec/non-grouping-aggregator.cc b/be/src/exec/non-grouping-aggregator.cc
index 424143e..43b38a8 100644
--- a/be/src/exec/non-grouping-aggregator.cc
+++ b/be/src/exec/non-grouping-aggregator.cc
@@ -21,6 +21,7 @@
 
 #include "codegen/llvm-codegen.h"
 #include "exec/exec-node.h"
+#include "exec/exec-node.inline.h"
 #include "exprs/agg-fn-evaluator.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/descriptors.h"
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 1fdffaf..f16f5c4 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -25,6 +25,7 @@
 #include <gutil/strings/substitute.h>
 
 #include "codegen/codegen-anyval.h"
+#include "exec/exec-node.inline.h"
 #include "exec/hdfs-scan-node.h"
 #include "exec/parquet/parquet-collection-column-reader.h"
 #include "exec/parquet/parquet-column-readers.h"
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 6666ede..5ed95bd 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -37,7 +37,9 @@
 #include "runtime/runtime-state.h"
 #include "util/bloom-filter.h"
 #include "util/cyclic-barrier.h"
+#include "util/debug-util.h"
 #include "util/min-max-filter.h"
+#include "util/pretty-printer.h"
 #include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/PlanNodes_types.h"
@@ -339,6 +341,7 @@ Status PhjBuilder::FinalizeBuild(RuntimeState* state) {
   int64_t num_build_rows = 0;
   for (const unique_ptr<Partition>& partition : hash_partitions_) {
     num_build_rows += partition->build_rows()->num_rows();
+    partition->build_rows()->DoneWriting();
   }
 
   if (num_build_rows > 0) {
@@ -371,12 +374,17 @@ Status PhjBuilder::FinalizeBuild(RuntimeState* state) {
   if (ht_ctx_->level() == 0) {
     PublishRuntimeFilters(num_build_rows);
     non_empty_build_ |= (num_build_rows > 0);
-  }
 
-  if (null_aware_partition_ != nullptr && null_aware_partition_->is_spilled()) {
-    // Free up memory for the hash tables of other partitions by unpinning the
-    // last block of the null aware partition's stream.
-    RETURN_IF_ERROR(null_aware_partition_->Spill(BufferedTupleStream::UNPIN_ALL));
+    if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+      if (null_aware_partition_->is_spilled()) {
+        // Free up memory for the hash tables of other partitions by unpinning the
+        // last block of the null aware partition's stream.
+        RETURN_IF_ERROR(null_aware_partition_->Spill(BufferedTupleStream::UNPIN_ALL));
+      } else {
+        // Invalidate the write iterator so we can safely do concurrent reads later.
+        null_aware_partition_->build_rows()->DoneWriting();
+      }
+    }
   }
 
   HashJoinState next_state;
@@ -830,6 +838,53 @@ void PhjBuilder::CleanUpSinglePartition(
   spilled_partitions_.pop_back();
 }
 
+Status PhjBuilder::BeginNullAwareProbe() {
+  DCHECK(null_aware_partition_ != nullptr);
+  if (num_probe_threads_ > 1) {
+    return probe_barrier_->Wait([&]() {
+      return BeginNullAwareProbeSerial();
+    });
+  } else {
+    return BeginNullAwareProbeSerial();
+  }
+}
+
+Status PhjBuilder::BeginNullAwareProbeSerial() {
+  BufferedTupleStream* build_rows = null_aware_partition_->build_rows();
+  bool pinned;
+  RETURN_IF_ERROR(build_rows->PinStream(&pinned));
+  if (!pinned) {
+    return Status(TErrorCode::NAAJ_OUT_OF_MEMORY, build_rows->num_rows(),
+        PrettyPrinter::PrintBytes(build_rows->byte_size()),
+        PrettyPrinter::PrintBytes(
+            buffer_pool_client_->GetUnusedReservation() + build_rows->BytesPinned(false)),
+        PrettyPrinter::PrintBytes(buffer_pool_client_->GetReservation()));
+  }
+  return Status::OK();
+}
+
+Status PhjBuilder::DoneProbingNullAwarePartition() {
+  DCHECK(null_aware_partition_ != nullptr);
+  if (num_probe_threads_ > 1) {
+    RETURN_IF_ERROR(probe_barrier_->Wait([&]() {
+      CloseNullAwarePartition();
+      return Status::OK();
+    }));
+  } else {
+    CloseNullAwarePartition();
+  }
+  return Status::OK();
+}
+
+void PhjBuilder::CloseNullAwarePartition() {
+  if (null_aware_partition_ == nullptr) return;
+  // We don't need to pass in a batch because the anti-join only returns tuple data
+  // from the probe side - i.e. the RowDescriptor for PartitionedHashJoinNode does
+  // not include the build tuple.
+  null_aware_partition_->Close(nullptr);
+  null_aware_partition_.reset();
+}
+
 void PhjBuilder::CloseAndDeletePartitions(RowBatch* row_batch) {
   // Close all the partitions and clean up all references to them.
   for (unique_ptr<Partition>& partition : hash_partitions_) {
@@ -1015,7 +1070,9 @@ Status PhjBuilder::BeginSpilledProbeSerial() {
           << " for probe clients.";
   // All reservation should be available for repartitioning.
   DCHECK_EQ(0, probe_stream_reservation_.GetReservation());
-  DCHECK_EQ(0, buffer_pool_client_->GetUsedReservation());
+  DCHECK(buffer_pool_client_->GetUsedReservation() == 0
+        || (!is_separate_build_ && join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN))
+      << "Only NAAJ probe streams should be consuming memory" << DebugString();
 
   DCHECK_EQ(partition->build_rows()->BytesPinned(false), 0) << DebugString();
   int64_t num_input_rows = partition->build_rows()->num_rows();
@@ -1299,7 +1356,10 @@ void PhjBuilderConfig::Codegen(FragmentState* state) {
 
 string PhjBuilder::DebugString() const {
   stringstream ss;
-  ss << " PhjBuilder state=" << PrintState(state_)
+  ss << " PhjBuilder op=" << PrintThriftEnum(join_op_)
+     << " is_separate_build=" << is_separate_build_
+     << " num_probe_threads=" << num_probe_threads_
+     << " state=" << PrintState(state_)
      << " Hash partitions: " << hash_partitions_.size() << ":" << endl;
   for (int i = 0; i < hash_partitions_.size(); ++i) {
     ss << " Hash partition " << i << " " << hash_partitions_[i]->DebugString() << endl;
@@ -1312,6 +1372,7 @@ string PhjBuilder::DebugString() const {
   if (null_aware_partition_ != nullptr) {
     ss << "Null-aware partition: " << null_aware_partition_->DebugString();
   }
+  ss << " buffer_pool_client=" << buffer_pool_client_->DebugString();
   return ss.str();
 }
 
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index d3ee104..01d0f03 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -384,17 +384,22 @@ class PhjBuilder : public JoinBuilder {
       RuntimeProfile* probe_profile,
       std::deque<std::unique_ptr<Partition>>* output_partitions, RowBatch* batch);
 
-  /// Close the null aware partition (if there is one) and set it to NULL.
-  /// TODO: IMPALA-9176: improve the encapsulation of the null-aware partition.
-  void CloseNullAwarePartition() {
-    if (null_aware_partition_ != nullptr) {
-      // We don't need to pass in a batch because the anti-join only returns tuple data
-      // from the probe side - i.e. the RowDescriptor for PartitionedHashJoinNode does
-      // not include the build tuple.
-      null_aware_partition_->Close(nullptr);
-      null_aware_partition_.reset();
-    }
-  }
+  /// Called to begin probing of the null-aware partition, after all other partitions
+  /// have been fully processed. This should only be called if there are build rows in the
+  /// null-aware partition. This pins the null-aware build rows in memory and allows all
+  /// probe threads to access those rows in a read-only manner.
+  ///
+  /// Returns an error if an error was encountered or if the query was cancelled.
+  ///
+  /// This is a synchronization point for shared join build. All probe threads must
+  /// call this function before continuing the next phase of the hash join algorithm.
+  Status BeginNullAwareProbe();
+
+  /// Called after probing of the null-aware build partition is complete.
+  ///
+  /// This is a synchronization point for shared join build. All probe threads must
+  /// call this function before continuing the next phase of the hash join algorithm.
+  Status DoneProbingNullAwarePartition();
 
   /// True if the hash table may contain rows with one or more NULL join keys. This
   /// depends on the join type, passed in via 'join_op' and the 'is_not_distinct_from'
@@ -412,8 +417,9 @@ class PhjBuilder : public JoinBuilder {
   /// Safe to call from PartitionedHashJoinNode threads during the probe phase.
   HashJoinState state() const { return state_; }
 
-  /// Accessor to allow PartitionedHashJoinNode to access null_aware_partition_.
-  /// TODO: IMPALA-9176: improve the encapsulation of the null-aware partition.
+  /// Accessor to allow PartitionedHashJoinNode to access 'null_aware_partition_'.
+  /// Generally the PartitionedHashJoinNode should only access this partition in
+  /// a read-only manner.
   inline Partition* null_aware_partition() const { return null_aware_partition_.get(); }
 
   /// Thread-safe.
@@ -679,6 +685,12 @@ class PhjBuilder : public JoinBuilder {
   void CleanUpSinglePartition(
       std::deque<std::unique_ptr<Partition>>* output_partitions, RowBatch* batch);
 
+  /// The serial part of BeginNullAwareProbe() that is executed by a single thread.
+  Status BeginNullAwareProbeSerial();
+
+  /// Close the null aware partition (if there is one) and set it to NULL.
+  void CloseNullAwarePartition();
+
   /// Calls Close() on every Partition, deletes them, and cleans up any pointers that
   /// may reference them. If 'row_batch' if not NULL, transfers the ownership of all
   /// row-backing resources to it.
@@ -858,7 +870,7 @@ class PhjBuilder : public JoinBuilder {
   const PhjBuilderConfig::ProcessBuildBatchFn& process_build_batch_fn_level0_;
 
   /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled.
-  const Partition::InsertBatchFn& insert_batch_fn_ ;
+  const Partition::InsertBatchFn& insert_batch_fn_;
   const Partition::InsertBatchFn& insert_batch_fn_level0_;
 };
 } // namespace impala
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index 81052d1..2aa81b8 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -18,6 +18,8 @@
 #include "exec/partitioned-hash-join-node.inline.h"
 
 #include "codegen/impala-ir.h"
+#include "exec/blocking-join-node.inline.h"
+#include "exec/exec-node.inline.h"
 #include "exec/hash-table.inline.h"
 #include "runtime/row-batch.h"
 
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 76fd3a3..a6f979c 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -23,6 +23,8 @@
 #include <gutil/strings/substitute.h>
 
 #include "codegen/llvm-codegen.h"
+#include "exec/blocking-join-node.inline.h"
+#include "exec/exec-node.inline.h"
 #include "exec/exec-node-util.h"
 #include "exec/hash-table.inline.h"
 #include "exprs/scalar-expr-evaluator.h"
@@ -34,7 +36,6 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "util/debug-util.h"
-#include "util/pretty-printer.h"
 #include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/PlanNodes_types.h"
@@ -122,7 +123,7 @@ PartitionedHashJoinNode::PartitionedHashJoinNode(RuntimeState* state,
 
 PartitionedHashJoinNode::~PartitionedHashJoinNode() {
   // Check that we didn't leak any memory.
-  DCHECK(null_probe_rows_ == NULL);
+  DCHECK(null_probe_rows_ == nullptr);
 }
 
 Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
@@ -214,7 +215,7 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state));
   ResetForProbe();
   probe_state_ = ProbeState::PROBING_IN_BATCH;
-  DCHECK(null_aware_probe_partition_ == NULL
+  DCHECK(null_aware_probe_partition_ == nullptr
       || join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
   return Status::OK();
 }
@@ -261,16 +262,16 @@ Status PartitionedHashJoinNode::Reset(RuntimeState* state, RowBatch* row_batch)
 void PartitionedHashJoinNode::CloseAndDeletePartitions(RowBatch* row_batch) {
   // Close all the partitions and clean up all references to them.
   for (unique_ptr<ProbePartition>& partition : probe_hash_partitions_) {
-    if (partition != NULL) partition->Close(row_batch);
+    if (partition != nullptr) partition->Close(row_batch);
   }
   probe_hash_partitions_.clear();
   for (auto& entry : spilled_partitions_) entry.second->Close(row_batch);
   spilled_partitions_.clear();
-  if (input_partition_ != NULL) {
+  if (input_partition_ != nullptr) {
     input_partition_->Close(row_batch);
     input_partition_.reset();
   }
-  if (null_aware_probe_partition_ != NULL) {
+  if (null_aware_probe_partition_ != nullptr) {
     null_aware_probe_partition_->Close(row_batch);
     null_aware_probe_partition_.reset();
   }
@@ -278,7 +279,7 @@ void PartitionedHashJoinNode::CloseAndDeletePartitions(RowBatch* row_batch) {
     partition->Close(row_batch);
   }
   output_build_partitions_.clear();
-  if (null_probe_rows_ != NULL) {
+  if (null_probe_rows_ != nullptr) {
     null_probe_rows_->Close(row_batch, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
     null_probe_rows_.reset();
   }
@@ -341,7 +342,7 @@ Status PartitionedHashJoinNode::ProbePartition::PrepareForRead() {
 
 void PartitionedHashJoinNode::ProbePartition::Close(RowBatch* batch) {
   if (IsClosed()) return;
-  if (probe_rows_ != NULL) {
+  if (probe_rows_ != nullptr) {
     // Flush out the resources to free up memory for subsequent partitions.
     probe_rows_->Close(batch, RowBatch::FlushMode::FLUSH_RESOURCES);
     probe_rows_.reset();
@@ -385,7 +386,7 @@ Status PartitionedHashJoinNode::NextProbeRowBatchFromChild(
       return Status::OK();
     }
     if (probe_side_eos_) {
-      current_probe_row_ = NULL;
+      current_probe_row_ = nullptr;
       probe_batch_pos_ = -1;
       *eos = true;
       return Status::OK();
@@ -400,7 +401,7 @@ Status PartitionedHashJoinNode::NextProbeRowBatchFromChild(
 
 Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
     RuntimeState* state, RowBatch* out_batch, bool* eos) {
-  DCHECK(input_partition_ != NULL);
+  DCHECK(input_partition_ != nullptr);
   DCHECK(builder_->state() == HashJoinState::PROBING_SPILLED_PARTITION
       || builder_->state() == HashJoinState::REPARTITIONING_PROBE);
   DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_END_BATCH);
@@ -473,11 +474,11 @@ Status PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch) {
   Status status;
   TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode;
   SCOPED_TIMER(probe_timer_);
-  if (process_probe_batch_fn_ == NULL) {
+  if (process_probe_batch_fn_ == nullptr) {
     rows_added = ProcessProbeBatch(
         join_op_, prefetch_mode, out_batch, ht_ctx_.get(), &status);
   } else {
-    DCHECK(process_probe_batch_fn_level0_ != NULL);
+    DCHECK(process_probe_batch_fn_level0_ != nullptr);
     if (ht_ctx_->level() == 0) {
       rows_added = process_probe_batch_fn_level0_(
           this, prefetch_mode, out_batch, ht_ctx_.get(), &status);
@@ -616,12 +617,11 @@ Status PartitionedHashJoinNode::GetNext(
           // Move to the next spilled partition.
           RETURN_IF_ERROR(BeginSpilledProbe());
           probe_state_ = ProbeState::PROBING_END_BATCH;
-        } else if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN
-            && builder_->null_aware_partition() != nullptr) {
+        } else if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
           // Null aware anti join outputs additional rows after all the probe input,
           // including spilled partitions, is processed.
           bool has_null_aware_rows;
-          RETURN_IF_ERROR(PrepareNullAwarePartition(&has_null_aware_rows));
+          RETURN_IF_ERROR(BeginNullAwareProbe(&has_null_aware_rows));
           probe_state_ = has_null_aware_rows ? ProbeState::OUTPUTTING_NULL_AWARE :
                                                ProbeState::OUTPUTTING_NULL_PROBE;
         } else {
@@ -632,7 +632,6 @@ Status PartitionedHashJoinNode::GetNext(
       }
       case ProbeState::OUTPUTTING_NULL_AWARE: {
         DCHECK_ENUM_EQ(join_op_, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
-        DCHECK(builder_->null_aware_partition() != nullptr);
         DCHECK(null_aware_probe_partition_ != nullptr);
         bool napr_eos;
         RETURN_IF_ERROR(OutputNullAwareProbeRows(state, out_batch, &napr_eos));
@@ -641,7 +640,6 @@ Status PartitionedHashJoinNode::GetNext(
       }
       case ProbeState::OUTPUTTING_NULL_PROBE: {
         DCHECK_ENUM_EQ(join_op_, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
-        DCHECK(builder_->null_aware_partition() != nullptr);
         DCHECK_GE(null_probe_output_idx_, 0);
         bool nanp_done;
         RETURN_IF_ERROR(OutputNullAwareNullProbe(state, out_batch, &nanp_done));
@@ -653,7 +651,6 @@ Status PartitionedHashJoinNode::GetNext(
         DCHECK(probe_side_eos_);
         DCHECK(output_build_partitions_.empty());
         DCHECK(spilled_partitions_.empty());
-        DCHECK(builder_->null_aware_partition() == nullptr);
         DCHECK(null_aware_probe_partition_ == nullptr);
         *eos = true;
         break;
@@ -687,7 +684,7 @@ Status PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
   DCHECK(!output_build_partitions_.empty());
   DCHECK_ENUM_EQ(probe_state_, ProbeState::OUTPUTTING_UNMATCHED);
 
-  if (output_unmatched_batch_iter_.get() != NULL) {
+  if (output_unmatched_batch_iter_.get() != nullptr) {
     // There were no probe rows so we skipped building the hash table. In this case, all
     // build rows of the partition are unmatched.
     RETURN_IF_ERROR(OutputAllBuild(out_batch));
@@ -792,11 +789,11 @@ void PartitionedHashJoinNode::OutputUnmatchedBuildFromHashTable(RowBatch* out_ba
 
 void PartitionedHashJoinNode::OutputBuildRow(
     RowBatch* out_batch, TupleRow* build_row, RowBatch::Iterator* out_batch_iterator) {
-  DCHECK(build_row != NULL);
+  DCHECK(build_row != nullptr);
   if (join_op_ == TJoinOp::RIGHT_ANTI_JOIN) {
     out_batch->CopyRow(build_row, out_batch_iterator->Get());
   } else {
-    CreateOutputRow(out_batch_iterator->Get(), NULL, build_row);
+    CreateOutputRow(out_batch_iterator->Get(), nullptr, build_row);
   }
 }
 
@@ -817,8 +814,7 @@ Status PartitionedHashJoinNode::PrepareNullAwareNullProbe() {
 Status PartitionedHashJoinNode::OutputNullAwareNullProbe(
     RuntimeState* state, RowBatch* out_batch, bool* done) {
   DCHECK_ENUM_EQ(probe_state_, ProbeState::OUTPUTTING_NULL_PROBE);
-  DCHECK(builder_->null_aware_partition() != NULL);
-  DCHECK(null_aware_probe_partition_ != NULL);
+  DCHECK(null_aware_probe_partition_ != nullptr);
   DCHECK_NE(probe_batch_pos_, -1);
   *done = false;
 
@@ -829,13 +825,13 @@ Status PartitionedHashJoinNode::OutputNullAwareNullProbe(
     bool eos;
     RETURN_IF_ERROR(null_probe_rows_->GetNext(probe_batch_.get(), &eos));
     if (probe_batch_->num_rows() == 0 && eos) {
-      // All done.
-      builder_->CloseNullAwarePartition();
+      // All done outputting rows from null-aware partition. Clean everything up.
       null_aware_probe_partition_->Close(out_batch);
       null_aware_probe_partition_.reset();
       // Flush out the resources to free up memory.
       null_probe_rows_->Close(out_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
       null_probe_rows_.reset();
+      RETURN_IF_ERROR(builder_->DoneProbingNullAwarePartition());
       *done = true;
       return Status::OK();
     }
@@ -853,22 +849,6 @@ Status PartitionedHashJoinNode::OutputNullAwareNullProbe(
   return Status::OK();
 }
 
-// In this case we had a lot of NULLs on either the build/probe side. While this is
-// possible to process by re-reading the spilled streams for each row with minimal code
-// effort, this would behave very slowly (we'd need to do IO for each row). This seems
-// like a reasonable limitation for now.
-// TODO: revisit.
-Status PartitionedHashJoinNode::NullAwareAntiJoinError(BufferedTupleStream* rows) {
-  return Status(Substitute(
-      "Unable to perform Null-Aware Anti-Join. Could not get enough reservation to fit "
-      "all rows with NULLs from the build side in memory. Memory required for $0 rows "
-      "was $1. $2/$3 of the join's reservation was available for the rows.",
-      rows->num_rows(), PrettyPrinter::PrintBytes(rows->byte_size()),
-      PrettyPrinter::PrintBytes(
-          buffer_pool_client()->GetUnusedReservation() + rows->BytesPinned(false)),
-      PrettyPrinter::PrintBytes(buffer_pool_client()->GetReservation())));
-}
-
 Status PartitionedHashJoinNode::InitNullAwareProbePartition() {
   null_aware_probe_partition_.reset(
       new ProbePartition(runtime_state_, this, builder_->null_aware_partition()));
@@ -890,17 +870,15 @@ Status PartitionedHashJoinNode::InitNullProbeRows() {
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::PrepareNullAwarePartition(bool* has_null_aware_rows) {
+Status PartitionedHashJoinNode::BeginNullAwareProbe(bool* has_null_aware_rows) {
   DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBE_COMPLETE);
-  DCHECK(builder_->null_aware_partition() != NULL);
-  DCHECK(null_aware_probe_partition_ != NULL);
+  DCHECK(builder_->null_aware_partition() != nullptr);
+  DCHECK(null_aware_probe_partition_ != nullptr);
   DCHECK_EQ(probe_batch_pos_, -1);
   DCHECK_EQ(probe_batch_->num_rows(), 0);
 
-  BufferedTupleStream* build_stream = builder_->null_aware_partition()->build_rows();
   BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
-
-  if (build_stream->num_rows() == 0) {
+  if (builder_->null_aware_partition()->build_rows()->num_rows() == 0) {
     // There were no build rows. Nothing to do. Just prepare to output the null
     // probe rows.
     DCHECK_EQ(probe_stream->num_rows(), 0);
@@ -909,11 +887,9 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition(bool* has_null_aware_r
     return Status::OK();
   }
 
-  bool pinned;
-  RETURN_IF_ERROR(build_stream->PinStream(&pinned));
-  if (!pinned) return NullAwareAntiJoinError(build_stream);
+  RETURN_IF_ERROR(builder_->BeginNullAwareProbe());
 
-  // Initialize the streams for read.
+  // Initialize the probe stream for reading.
   bool got_read_buffer;
   RETURN_IF_ERROR(probe_stream->PrepareForRead(true, &got_read_buffer));
   if (!got_read_buffer) {
@@ -928,12 +904,11 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition(bool* has_null_aware_r
 Status PartitionedHashJoinNode::OutputNullAwareProbeRows(
     RuntimeState* state, RowBatch* out_batch, bool* done) {
   DCHECK_ENUM_EQ(probe_state_, ProbeState::OUTPUTTING_NULL_AWARE);
-  DCHECK(builder_->null_aware_partition() != NULL);
-  DCHECK(null_aware_probe_partition_ != NULL);
+  DCHECK(null_aware_probe_partition_ != nullptr);
   *done = false;
   ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data();
   int num_join_conjuncts = other_join_conjuncts_.size();
-  DCHECK(probe_batch_ != NULL);
+  DCHECK(probe_batch_ != nullptr);
 
   BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
   if (probe_batch_pos_ == probe_batch_->num_rows()) {
@@ -954,21 +929,21 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(
     }
   }
 
+  RowBatch null_build_batch(&build_row_desc(), state->batch_size(), mem_tracker());
   // For each probe row, iterate over all the build rows and check for rows
   // that did not have any matches.
   for (; probe_batch_pos_ < probe_batch_->num_rows(); ++probe_batch_pos_) {
     if (out_batch->AtCapacity()) break;
     TupleRow* probe_row = probe_batch_->GetRow(probe_batch_pos_);
     bool matched = false;
-    bool got_reservation;
     BufferedTupleStream* null_build_stream =
         builder_->null_aware_partition()->build_rows();
-    RETURN_IF_ERROR(null_build_stream->PrepareForRead(false, &got_reservation));
-    DCHECK(got_reservation) << "Should have been pinned";
-    RowBatch null_build_batch(&build_row_desc(), state->batch_size(), mem_tracker());
+    DCHECK(null_build_stream->is_pinned());
+    BufferedTupleStream::ReadIterator build_itr;
+    RETURN_IF_ERROR(null_build_stream->PrepareForPinnedRead(&build_itr));
     bool eos;
     do {
-      RETURN_IF_ERROR(null_build_stream->GetNext(&null_build_batch, &eos));
+      RETURN_IF_ERROR(null_build_stream->GetNext(&build_itr, &null_build_batch, &eos));
       FOREACH_ROW(&null_build_batch, 0, iter) {
         CreateOutputRow(semi_join_staging_row_, probe_row, iter.Get());
         if (ExecNode::EvalConjuncts(
@@ -1031,14 +1006,14 @@ Status PartitionedHashJoinNode::PrepareForPartitionedProbe() {
         (*build_hash_partitions_.hash_partitions)[i].get();
     ProbePartition* probe_partition = probe_hash_partitions_[i].get();
     if (build_partition->IsClosed()) {
-      DCHECK(hash_tbls_[i] == NULL);
-      DCHECK(probe_partition == NULL);
+      DCHECK(hash_tbls_[i] == nullptr);
+      DCHECK(probe_partition == nullptr);
     } else if (build_partition->is_spilled()) {
-      DCHECK(hash_tbls_[i] == NULL);
-      DCHECK(probe_partition != NULL);
+      DCHECK(hash_tbls_[i] == nullptr);
+      DCHECK(probe_partition != nullptr);
     } else {
-      DCHECK(hash_tbls_[i] != NULL);
-      DCHECK(probe_partition == NULL);
+      DCHECK(hash_tbls_[i] != nullptr);
+      DCHECK(probe_partition == nullptr);
     }
   }
   return Status::OK();
@@ -1096,7 +1071,8 @@ bool PartitionedHashJoinNode::AppendProbeRowSlow(
 
 Status PartitionedHashJoinNode::EvaluateNullProbe(
     RuntimeState* state, BufferedTupleStream* build) {
-  if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) {
+  DCHECK(build->is_pinned());
+  if (null_probe_rows_ == nullptr || null_probe_rows_->num_rows() == 0) {
     return Status::OK();
   }
   DCHECK_EQ(null_probe_rows_->num_rows(), matched_null_probe_.size());
@@ -1107,10 +1083,7 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(
   ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data();
   int num_join_conjuncts = other_join_conjuncts_.size();
   RowBatch probe_batch(&probe_row_desc(), runtime_state_->batch_size(), mem_tracker());
-
-  bool pinned;
-  RETURN_IF_ERROR(build->PinStream(&pinned));
-  if (!pinned) return NullAwareAntiJoinError(build);
+  RowBatch build_batch(&build_row_desc(), state->batch_size(), mem_tracker());
 
   // For each probe row, iterate over all rows in the build table.
   SCOPED_TIMER(null_aware_eval_timer_);
@@ -1122,13 +1095,11 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(
       // This loop may run for a long time. Check for cancellation.
       RETURN_IF_CANCELLED(state);
       if (matched_null_probe_[probe_row_idx]) continue;
-      bool got_reservation;
-      RETURN_IF_ERROR(build->PrepareForRead(false, &got_reservation));
-      DCHECK(got_reservation) << "Should have been pinned";
-      RowBatch build_batch(&build_row_desc(), state->batch_size(), mem_tracker());
+      BufferedTupleStream::ReadIterator build_itr;
+      RETURN_IF_ERROR(build->PrepareForPinnedRead(&build_itr));
       bool build_eos;
       do {
-        RETURN_IF_ERROR(build->GetNext(&build_batch, &build_eos));
+        RETURN_IF_ERROR(build->GetNext(&build_itr, &build_batch, &build_eos));
         FOREACH_ROW(&build_batch, 0, iter) {
           CreateOutputRow(semi_join_staging_row_, probe_batch.GetRow(i), iter.Get());
           if (ExecNode::EvalConjuncts(
@@ -1240,7 +1211,7 @@ string PartitionedHashJoinNode::NodeDebugString() const {
   ss << "PartitionedHashJoinNode (id=" << id() << " op=" << join_op_
      << " #spilled_partitions=" << spilled_partitions_.size() << ")" << endl;
 
-  if (builder_ != NULL) {
+  if (builder_ != nullptr) {
     ss << "PhjBuilder: " << builder_->DebugString();
   }
 
@@ -1248,10 +1219,10 @@ string PartitionedHashJoinNode::NodeDebugString() const {
   for (int i = 0; i < probe_hash_partitions_.size(); ++i) {
     ProbePartition* probe_partition = probe_hash_partitions_[i].get();
     ss << "  Probe hash partition " << i << ": ";
-    if (probe_partition != NULL) {
+    if (probe_partition != nullptr) {
       ss << "probe ptr=" << probe_partition;
       BufferedTupleStream* probe_rows = probe_partition->probe_rows();
-      if (probe_rows != NULL) {
+      if (probe_rows != nullptr) {
         ss << "    Probe Rows: " << probe_rows->num_rows()
            << "    (Bytes total/pinned: " << probe_rows->byte_size() << "/"
            << probe_rows->BytesPinned(false) << ")";
@@ -1266,7 +1237,7 @@ string PartitionedHashJoinNode::NodeDebugString() const {
       ProbePartition* probe_partition = entry.second.get();
       PhjBuilder::Partition* build_partition = probe_partition->build_partition();
       DCHECK(build_partition->is_spilled());
-      DCHECK(build_partition->hash_tbl() == NULL);
+      DCHECK(build_partition->hash_tbl() == nullptr);
       int build_rows = build_partition->build_rows() == nullptr ?  -1 :
           build_partition->build_rows()->num_rows();
       int probe_rows = probe_partition->probe_rows() == nullptr ?  -1 :
@@ -1276,8 +1247,8 @@ string PartitionedHashJoinNode::NodeDebugString() const {
          << "   Spilled Probe Rows: " << probe_rows << endl;
     }
   }
-  if (input_partition_ != NULL) {
-    DCHECK(input_partition_->probe_rows() != NULL);
+  if (input_partition_ != nullptr) {
+    DCHECK(input_partition_->probe_rows() != nullptr);
     ss << "InputPartition: " << input_partition_.get() << endl;
     PhjBuilder::Partition* build_partition = input_partition_->build_partition();
     if (build_partition->IsClosed()) {
@@ -1293,7 +1264,7 @@ string PartitionedHashJoinNode::NodeDebugString() const {
   if (null_aware_probe_partition_ != nullptr) {
     ss << "null-aware probe partition ptr=" << null_aware_probe_partition_.get();
     BufferedTupleStream* probe_rows = null_aware_probe_partition_->probe_rows();
-    if (probe_rows != NULL) {
+    if (probe_rows != nullptr) {
       ss << "    Probe Rows: " << probe_rows->num_rows()
          << "    (Bytes total/pinned: " << probe_rows->byte_size() << "/"
          << probe_rows->BytesPinned(false) << ")" << endl;
@@ -1377,7 +1348,7 @@ Status PartitionedHashJoinPlanNode::CodegenCreateOutputRow(
   // Copy build row.
   llvm::BasicBlock* build_not_null_block =
       llvm::BasicBlock::Create(context, "build_not_null", *fn);
-  llvm::BasicBlock* build_null_block = NULL;
+  llvm::BasicBlock* build_null_block = nullptr;
 
   if (join_op_ == TJoinOp::LEFT_ANTI_JOIN || join_op_ == TJoinOp::LEFT_OUTER_JOIN ||
       join_op_ == TJoinOp::FULL_OUTER_JOIN ||
@@ -1410,7 +1381,7 @@ Status PartitionedHashJoinPlanNode::CodegenCreateOutputRow(
   builder.CreateRetVoid();
 
   *fn = codegen->FinalizeFunction(*fn);
-  if (*fn == NULL) {
+  if (*fn == nullptr) {
     return Status("PartitionedHashJoinNode::CodegenCreateOutputRow(): codegen'd "
         "CreateOutputRow() function failed verification, see log");
   }
@@ -1462,7 +1433,7 @@ Status PartitionedHashJoinPlanNode::CodegenProcessProbeBatch(
       DCHECK(false);
   }
   llvm::Function* process_probe_batch_fn = codegen->GetFunction(ir_fn, true);
-  DCHECK(process_probe_batch_fn != NULL);
+  DCHECK(process_probe_batch_fn != nullptr);
   process_probe_batch_fn->setName("ProcessProbeBatch");
 
   // Verifies that ProcessProbeBatch() has weak_odr linkage so it's not discarded even
@@ -1567,13 +1538,13 @@ Status PartitionedHashJoinPlanNode::CodegenProcessProbeBatch(
 
   // Finalize ProcessProbeBatch functions
   process_probe_batch_fn = codegen->FinalizeFunction(process_probe_batch_fn);
-  if (process_probe_batch_fn == NULL) {
+  if (process_probe_batch_fn == nullptr) {
     return Status("PartitionedHashJoinNode::CodegenProcessProbeBatch(): codegen'd "
         "ProcessProbeBatch() function failed verification, see log");
   }
   process_probe_batch_fn_level0 =
       codegen->FinalizeFunction(process_probe_batch_fn_level0);
-  if (process_probe_batch_fn_level0 == NULL) {
+  if (process_probe_batch_fn_level0 == nullptr) {
     return Status("PartitionedHashJoinNode::CodegenProcessProbeBatch(): codegen'd "
         "level-zero ProcessProbeBatch() function failed verification, see log");
   }
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 4be8868..3f677bf 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -446,11 +446,11 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// Initializes 'null_probe_rows_' and prepares that stream for writing.
   Status InitNullProbeRows() WARN_UNUSED_RESULT;
 
-  /// Initializes null_aware_partition_ and nulls_build_batch_ to output rows.
+  /// Prepare to output rows from the null-aware partition.
   /// *has_null_aware_rows is set to true if the null-aware partition has rows that need
   /// to be processed by calling OutputNullAwareProbeRows(), false otherwise. In both
   /// cases, null probe rows need to be processed with OutputNullAwareNullProbe().
-  Status PrepareNullAwarePartition(bool* has_null_aware_rows) WARN_UNUSED_RESULT;
+  Status BeginNullAwareProbe(bool* has_null_aware_rows) WARN_UNUSED_RESULT;
 
   /// Output rows from builder_->null_aware_partition(). Called when 'probe_state_'
   /// is OUTPUTTING_NULL_AWARE - after all input is processed, including spilled
@@ -459,9 +459,10 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   Status OutputNullAwareProbeRows(
       RuntimeState* state, RowBatch* out_batch, bool* done) WARN_UNUSED_RESULT;
 
-  /// Evaluates all other_join_conjuncts against null_probe_rows_ with all the
-  /// rows in build. This updates matched_null_probe_, short-circuiting if one of the
-  /// conjuncts pass (i.e. there is a match).
+  /// Evaluates 'other_join_conjuncts' for all pairs of 'null_probe_rows_' and the build
+  /// rows provided by the caller until a match is found for that null probe row. This
+  /// updates matched_null_probe_, short-circuiting if one of the conjuncts pass (i.e.
+  /// there is a match). 'build' must be pinned.
   /// This is used for NAAJ, when there are NULL probe rows.
   Status EvaluateNullProbe(
       RuntimeState* state, BufferedTupleStream* build) WARN_UNUSED_RESULT;
@@ -528,10 +529,6 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// rows from 'input_partition_'.
   Status BeginSpilledProbe() WARN_UNUSED_RESULT;
 
-  /// Construct an error status for the null-aware anti-join when it could not fit 'rows'
-  /// from the build side in memory.
-  Status NullAwareAntiJoinError(BufferedTupleStream* rows);
-
   /// Calls Close() on every probe partition, destroys the partitions and cleans up any
   /// references to the partitions. Also closes and destroys 'null_probe_rows_'. If
   /// 'row_batch' is not NULL, transfers ownership of all row-backing resources to it.
diff --git a/be/src/exec/select-node-ir.cc b/be/src/exec/select-node-ir.cc
index 5cd5e48..2a710f5 100644
--- a/be/src/exec/select-node-ir.cc
+++ b/be/src/exec/select-node-ir.cc
@@ -16,6 +16,8 @@
 // under the License.
 
 #include "exec/select-node.h"
+
+#include "exec/exec-node.inline.h"
 #include "runtime/tuple-row.h"
 
 using namespace impala;
diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc
index 82907da..dd91566 100644
--- a/be/src/exec/unnest-node.cc
+++ b/be/src/exec/unnest-node.cc
@@ -18,6 +18,7 @@
 #include "exec/unnest-node.h"
 
 #include "common/status.h"
+#include "exec/exec-node.inline.h"
 #include "exec/exec-node-util.h"
 #include "exec/subplan-node.h"
 #include "exprs/scalar-expr-evaluator.h"
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index ef0c189..37d0323 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -18,6 +18,7 @@
 #include <boost/bind.hpp>
 #include <boost/filesystem.hpp>
 #include <boost/scoped_ptr.hpp>
+#include <boost/thread/thread.hpp>
 
 #include <limits> // for std::numeric_limits<int>::max()
 #include <set>
@@ -254,15 +255,31 @@ class SimpleTupleStreamTest : public testing::Test {
     }
   }
 
+  /// Read values from 'stream' into 'results' using the embedded read iterator. 'stream'
+  /// must have been prepared for reading.
   template <typename T>
   void ReadValues(BufferedTupleStream* stream, RowDescriptor* desc, vector<T>* results,
       int num_batches = -1) {
+    return ReadValues(stream, nullptr, desc, results, num_batches);
+  }
+
+  /// Read values from 'stream' into 'results'. If 'read_it' is non-NULL, reads via that
+  /// iterator. Otherwise use the embedded read iterator, in which case 'stream' must have
+  /// been prepared for reading.
+  template <typename T>
+  void ReadValues(BufferedTupleStream* stream,
+      BufferedTupleStream::ReadIterator* read_it, RowDescriptor* desc, vector<T>* results,
+      int num_batches = -1) {
     bool eos = false;
     RowBatch batch(desc, BATCH_SIZE, &tracker_);
     int batches_read = 0;
     do {
       batch.Reset();
-      EXPECT_OK(stream->GetNext(&batch, &eos));
+      if (read_it != nullptr) {
+        EXPECT_OK(stream->GetNext(read_it, &batch, &eos));
+      } else {
+        EXPECT_OK(stream->GetNext(&batch, &eos));
+      }
       ++batches_read;
       for (int i = 0; i < batch.num_rows(); ++i) {
         AppendRowTuples(batch.GetRow(i), desc, results);
@@ -1173,7 +1190,7 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
     bool eos;
     ASSERT_OK(stream.GetNext(&read_batch, &eos));
     EXPECT_EQ(1, read_batch.num_rows());
-    EXPECT_TRUE(eos);
+    EXPECT_TRUE(eos) << i << " " << stream.DebugString();
     Tuple* tuple = read_batch.GetRow(0)->GetTuple(0);
     StringValue* str = tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
     EXPECT_EQ(string_len, str->len);
@@ -1362,6 +1379,79 @@ TEST_F(SimpleTupleStreamTest, WriteAfterReadAttached) {
   write_batch->Reset();
 }
 
+/// Test multiple threads reading from a pinned stream using separate read iterators.
+TEST_F(SimpleTupleStreamTest, ConcurrentReaders) {
+  const int BUFFER_SIZE = 1024;
+  // Each tuple is an integer plus a null indicator byte.
+  const int VALS_PER_BUFFER = BUFFER_SIZE / (sizeof(int32_t) + 1);
+  const int NUM_BUFFERS = 100;
+  const int TOTAL_MEM = NUM_BUFFERS * BUFFER_SIZE;
+  Init(TOTAL_MEM);
+  BufferedTupleStream stream(
+      runtime_state_, int_desc_, &client_, BUFFER_SIZE, BUFFER_SIZE);
+  ASSERT_OK(stream.Init("ConcurrentReaders", true));
+  bool got_write_reservation;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
+  ASSERT_TRUE(got_write_reservation);
+
+  // Add rows to the stream.
+  int offset = 0;
+  const int NUM_BATCHES = NUM_BUFFERS;
+  const int ROWS_PER_BATCH = VALS_PER_BUFFER;
+  for (int i = 0; i < NUM_BATCHES; ++i) {
+    RowBatch* batch = nullptr;
+    Status status;
+    batch = CreateBatch(int_desc_, offset, ROWS_PER_BATCH, false);
+    for (int j = 0; j < batch->num_rows(); ++j) {
+      bool b = stream.AddRow(batch->GetRow(j), &status);
+      ASSERT_OK(status);
+      ASSERT_TRUE(b);
+    }
+    offset += batch->num_rows();
+    // Reset the batch to make sure the stream handles the memory correctly.
+    batch->Reset();
+  }
+  // Invalidate the write iterator explicitly so that we can read concurrently.
+  stream.DoneWriting();
+
+  const int READ_ITERS = 10; // Do multiple read passes per thread.
+  const int NUM_THREADS = 4;
+
+  // Read from the main thread with the built-in iterator and other threads with
+  // external iterators.
+  thread_group workers;
+  for (int i = 0; i < NUM_THREADS; ++i) {
+    workers.add_thread(new thread([&] () {
+      for (int j = 0; j < READ_ITERS; ++j) {
+        BufferedTupleStream::ReadIterator it;
+        ASSERT_OK(stream.PrepareForPinnedRead(&it));
+
+        // Read all the rows back
+        vector<int> results;
+        ReadValues(&stream, &it, int_desc_, &results);
+
+        // Verify result
+        VerifyResults<int>(*int_desc_, results, ROWS_PER_BATCH * NUM_BATCHES, false);
+      }
+    }));
+  }
+
+  for (int i = 0; i < READ_ITERS; ++i) {
+    bool got_read_reservation;
+    ASSERT_OK(stream.PrepareForRead(false, &got_read_reservation));
+    ASSERT_TRUE(got_read_reservation);
+
+    // Read all the rows back
+    vector<int> results;
+    ReadValues(&stream, int_desc_, &results);
+
+    // Verify result
+    VerifyResults<int>(*int_desc_, results, ROWS_PER_BATCH * NUM_BATCHES, false);
+  }
+  workers.join_all();
+  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+}
+
 // Basic API test. No data should be going to disk.
 TEST_F(SimpleNullStreamTest, Basic) {
   Init(BUFFER_POOL_LIMIT);
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index a37c443..a35a89e 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -37,11 +37,11 @@
 #include "common/names.h"
 
 #ifdef NDEBUG
-#define CHECK_CONSISTENCY_FAST()
-#define CHECK_CONSISTENCY_FULL()
+#define CHECK_CONSISTENCY_FAST(read_it)
+#define CHECK_CONSISTENCY_FULL(read_it)
 #else
-#define CHECK_CONSISTENCY_FAST() CheckConsistencyFast()
-#define CHECK_CONSISTENCY_FULL() CheckConsistencyFull()
+#define CHECK_CONSISTENCY_FAST(read_it) CheckConsistencyFast(read_it)
+#define CHECK_CONSISTENCY_FULL(read_it) CheckConsistencyFull(read_it)
 #endif
 
 using namespace impala;
@@ -65,7 +65,7 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
   DCHECK_GE(max_page_len, default_page_len);
   DCHECK(BitUtil::IsPowerOf2(default_page_len)) << default_page_len;
   DCHECK(BitUtil::IsPowerOf2(max_page_len)) << max_page_len;
-  read_page_ = pages_.end();
+  read_it_.Reset(&pages_);
   for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) {
     const TupleDescriptor* tuple_desc = desc_->tuple_descriptors()[i];
     const int tuple_byte_size = tuple_desc->byte_size();
@@ -95,21 +95,21 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
   }
 }
 
-void BufferedTupleStream::CheckConsistencyFull() const {
-  CheckConsistencyFast();
+void BufferedTupleStream::CheckConsistencyFull(const ReadIterator& read_it) const {
+  CheckConsistencyFast(read_it);
   // The below checks require iterating over all the pages in the stream.
   DCHECK_EQ(bytes_pinned_, CalcBytesPinned()) << DebugString();
   DCHECK_EQ(pages_.size(), num_pages_) << DebugString();
   for (const Page& page : pages_) CheckPageConsistency(&page);
 }
 
-void BufferedTupleStream::CheckConsistencyFast() const {
+void BufferedTupleStream::CheckConsistencyFast(const ReadIterator& read_it) const {
   // All the below checks should be O(1).
   DCHECK(has_write_iterator() || write_page_ == nullptr);
   if (write_page_ != nullptr) {
     CheckPageConsistency(write_page_);
     DCHECK(write_page_->is_pinned());
-    DCHECK(write_page_->retrieved_buffer);
+    DCHECK(write_page_->retrieved_buffer.Load());
     const BufferHandle* write_buffer;
     Status status = write_page_->GetBuffer(&write_buffer);
     DCHECK(status.ok()); // Write buffer should never have been unpinned.
@@ -117,18 +117,20 @@ void BufferedTupleStream::CheckConsistencyFast() const {
     DCHECK_EQ(write_end_ptr_, write_buffer->data() + write_page_->len());
     DCHECK_GE(write_end_ptr_, write_ptr_);
   }
-  DCHECK(has_read_iterator() || read_page_ == pages_.end());
-  if (read_page_ != pages_.end()) {
-    CheckPageConsistency(&*read_page_);
-    if (!read_page_->attached_to_output_batch) {
-      DCHECK(read_page_->is_pinned());
-      DCHECK(read_page_->retrieved_buffer);
+  DCHECK(read_it.is_valid() || read_it.read_page_ == pages_.end());
+  if (read_it.read_page_ != pages_.end()) {
+    CheckPageConsistency(&*read_it.read_page_);
+    if (!read_it.read_page_->attached_to_output_batch) {
+      DCHECK(read_it.read_page_->is_pinned());
+      DCHECK(read_it.read_page_->retrieved_buffer.Load());
       // Can't check read buffer without affecting behaviour, because a read may be in
       // flight and this would required blocking on that write.
-      DCHECK_GE(read_end_ptr_, read_ptr_);
+      DCHECK_GE(read_it.read_end_ptr_, read_it.read_ptr_);
     }
   }
-  if (NeedReadReservation()) {
+  DCHECK(&read_it == &read_it_ || !read_it_.attach_on_read_)
+      << "External read iterators cannot coexist with attach_on_read mode";
+  if (&read_it == &read_it_ && NeedReadReservation()) {
     DCHECK_EQ(default_page_len_, read_page_reservation_.GetReservation())
         << DebugString();
   } else if (!read_page_reservation_.is_closed()) {
@@ -157,18 +159,11 @@ void BufferedTupleStream::CheckPageConsistency(const Page* page) const {
 
 string BufferedTupleStream::DebugString() const {
   stringstream ss;
-  ss << "BufferedTupleStream num_rows=" << num_rows_
-     << " rows_returned=" << rows_returned_ << " pinned=" << pinned_
-     << " attach_on_read=" << attach_on_read_ << " closed=" << closed_ << "\n"
+  ss << "BufferedTupleStream num_rows=" << num_rows_ << " pinned=" << pinned_
+     << " closed=" << closed_ << "\n"
      << " bytes_pinned=" << bytes_pinned_ << " has_write_iterator=" << has_write_iterator_
-     << " write_page=" << write_page_ << " has_read_iterator=" << has_read_iterator_
-     << " read_page=";
-  if (read_page_ == pages_.end()) {
-    ss << "<end>";
-  } else {
-    ss << &*read_page_;
-  }
-  ss << "\n"
+     << " write_page=" << write_page_ << " read_iterator=" << read_it_.DebugString(pages_)
+     << "\n"
      << " read_page_reservation=";
   if (read_page_reservation_.is_closed()) {
     ss << "<closed>";
@@ -193,7 +188,7 @@ string BufferedTupleStream::DebugString() const {
 void BufferedTupleStream::Page::AttachBufferToBatch(
     BufferedTupleStream* parent, RowBatch* batch, FlushMode flush) {
   DCHECK(is_pinned());
-  DCHECK(retrieved_buffer);
+  DCHECK(retrieved_buffer.Load());
   parent->bytes_pinned_ -= len();
   // ExtractBuffer() cannot fail because the buffer is already in memory.
   BufferPool::BufferHandle buffer;
@@ -205,8 +200,8 @@ void BufferedTupleStream::Page::AttachBufferToBatch(
 }
 
 string BufferedTupleStream::Page::DebugString() const {
-  return Substitute("$0 num_rows=$1 retrived_buffer=$2 attached_to_output_batch=$3",
-      handle.DebugString(), num_rows, retrieved_buffer, attached_to_output_batch);
+  return Substitute("$0 num_rows=$1 retrieved_buffer=$2 attached_to_output_batch=$3",
+      handle.DebugString(), num_rows, retrieved_buffer.Load(), attached_to_output_batch);
 }
 
 Status BufferedTupleStream::Init(const string& caller_label, bool pinned) {
@@ -218,17 +213,17 @@ Status BufferedTupleStream::Init(const string& caller_label, bool pinned) {
 Status BufferedTupleStream::PrepareForWrite(bool* got_reservation) {
   // This must be the first iterator created.
   DCHECK(pages_.empty());
-  DCHECK(!attach_on_read_);
+  DCHECK(!read_it_.attach_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
-  CHECK_CONSISTENCY_FULL();
+  CHECK_CONSISTENCY_FULL(read_it_);
 
   *got_reservation = buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
   if (!*got_reservation) return Status::OK();
   has_write_iterator_ = true;
   // Save reservation for the write iterators.
   buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
-  CHECK_CONSISTENCY_FULL();
+  CHECK_CONSISTENCY_FULL(read_it_);
   return Status::OK();
 }
 
@@ -236,10 +231,10 @@ Status BufferedTupleStream::PrepareForReadWrite(
     bool attach_on_read, bool* got_reservation) {
   // This must be the first iterator created.
   DCHECK(pages_.empty());
-  DCHECK(!attach_on_read_);
+  DCHECK(!read_it_.attach_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
-  CHECK_CONSISTENCY_FULL();
+  CHECK_CONSISTENCY_FULL(read_it_);
 
   *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 * default_page_len_);
   if (!*got_reservation) return Status::OK();
@@ -247,14 +242,14 @@ Status BufferedTupleStream::PrepareForReadWrite(
   // Save reservation for both the read and write iterators.
   buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_);
   buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
-  RETURN_IF_ERROR(PrepareForReadInternal(attach_on_read));
+  RETURN_IF_ERROR(PrepareForReadInternal(attach_on_read, &read_it_));
   return Status::OK();
 }
 
 void BufferedTupleStream::Close(RowBatch* batch, FlushMode flush) {
   for (Page& page : pages_) {
     if (page.attached_to_output_batch) continue; // Already returned.
-    if (batch != nullptr && page.retrieved_buffer) {
+    if (batch != nullptr && page.retrieved_buffer.Load()) {
       // Subtle: We only need to attach buffers from pages that we may have returned
       // references to.
       page.AttachBufferToBatch(this, batch, flush);
@@ -309,7 +304,7 @@ void BufferedTupleStream::UnpinPageIfNeeded(Page* page, bool stream_pinned) {
     bytes_pinned_ -= page->len();
     DCHECK_GE(bytes_pinned_, 0);
     bytes_unpinned_ += page->len();
-    if (page->pin_count() == 0) page->retrieved_buffer = false;
+    if (page->pin_count() == 0) page->retrieved_buffer.Store(false);
   }
 }
 
@@ -343,8 +338,8 @@ bool BufferedTupleStream::NeedReadReservation() const {
 }
 
 bool BufferedTupleStream::NeedReadReservation(bool stream_pinned) const {
-  return NeedReadReservation(
-      stream_pinned, num_pages_, has_read_iterator(), read_page_ != pages_.end());
+  return NeedReadReservation(stream_pinned, num_pages_, has_read_iterator(),
+      read_it_.read_page_ != pages_.end());
 }
 
 bool BufferedTupleStream::NeedReadReservation(bool stream_pinned, int64_t num_pages,
@@ -401,7 +396,7 @@ Status BufferedTupleStream::CalcPageLenForRow(int64_t row_size, int64_t* page_le
 Status BufferedTupleStream::AdvanceWritePage(
     int64_t row_size, bool* got_reservation) noexcept {
   DCHECK(has_write_iterator());
-  CHECK_CONSISTENCY_FAST();
+  CHECK_CONSISTENCY_FAST(read_it_);
 
   int64_t page_len;
   RETURN_IF_ERROR(CalcPageLenForRow(row_size, &page_len));
@@ -417,9 +412,9 @@ Status BufferedTupleStream::AdvanceWritePage(
   // If the stream is pinned, we need to keep the previous write page pinned for reading.
   // Check if we saved reservation for this case.
   if (NeedReadReservation(pinned_, num_pages_, has_read_iterator(),
-          read_page_ != pages_.end(), true, write_page_ != nullptr)
+          read_it_.read_page_ != pages_.end(), true, write_page_ != nullptr)
       && !NeedReadReservation(pinned_, num_pages_ + 1, has_read_iterator(),
-             read_page_ != pages_.end(), true, true)) {
+             read_it_.read_page_ != pages_.end(), true, true)) {
     read_reservation_to_restore = default_page_len_;
   }
 
@@ -433,7 +428,7 @@ Status BufferedTupleStream::AdvanceWritePage(
           - write_page_reservation_to_reclaim)) {
     DCHECK(pinned_ || page_len > default_page_len_)
         << "If the stream is unpinned, this should only fail for large pages";
-    CHECK_CONSISTENCY_FAST();
+    CHECK_CONSISTENCY_FAST(read_it_);
     *got_reservation = false;
     return Status::OK();
   }
@@ -472,55 +467,59 @@ void BufferedTupleStream::InvalidateWriteIterator() {
   write_page_reservation_.Close();
   // May not need a read reservation once the write iterator is invalidated.
   if (NeedReadReservation(pinned_, num_pages_, has_read_iterator(),
-          read_page_ != pages_.end(), true, write_page_ != nullptr)
+          read_it_.read_page_ != pages_.end(), true, write_page_ != nullptr)
       && !NeedReadReservation(pinned_, num_pages_, has_read_iterator(),
-             read_page_ != pages_.end(), false, false)) {
+             read_it_.read_page_ != pages_.end(), false, false)) {
     buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
   }
 }
 
-Status BufferedTupleStream::NextReadPage() {
-  DCHECK(has_read_iterator());
+Status BufferedTupleStream::NextReadPage(ReadIterator* read_iter) {
+  DCHECK(read_iter->is_valid());
   DCHECK(!closed_);
-  CHECK_CONSISTENCY_FAST();
+  DCHECK(read_iter == &read_it_ || (pinned_ && !read_iter->attach_on_read_))
+      << "External read iterators only support pinned streams with no attach on read "
+      << read_iter->DebugString(pages_);
+  CHECK_CONSISTENCY_FAST(*read_iter);
 
-  if (read_page_ == pages_.end()) {
+  if (read_iter->read_page_ == pages_.end()) {
     // No rows read yet - start reading at first page. If the stream is unpinned, we can
     // use the reservation saved in PrepareForReadWrite() to pin the first page.
-    read_page_ = pages_.begin();
+    read_iter->SetReadPage(pages_.begin());
     if (NeedReadReservation(pinned_, num_pages_, true, false)
         && !NeedReadReservation(pinned_, num_pages_, true, true)) {
       buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
     }
-  } else if (attach_on_read_) {
-    DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " "
-                                         << DebugString();
-    DCHECK_NE(&*read_page_, write_page_);
-    DCHECK(read_page_->attached_to_output_batch);
+  } else if (read_iter->attach_on_read_) {
+    DCHECK(read_iter->read_page_ == pages_.begin())
+        << read_iter->read_page_->DebugString() << " " << DebugString();
+    DCHECK_NE(&*read_iter->read_page_, write_page_);
+    DCHECK(read_iter->read_page_->attached_to_output_batch);
     pages_.pop_front();
     --num_pages_;
-    read_page_ = pages_.begin();
+    read_iter->SetReadPage(pages_.begin());
   } else {
     // Unpin pages after reading them if needed.
-    Page* prev_read_page = &*read_page_;
-    ++read_page_;
+    Page* prev_read_page = &*read_iter->read_page_;
+    read_iter->AdvanceReadPage(pages_);
     UnpinPageIfNeeded(prev_read_page, pinned_);
   }
 
-  if (read_page_ == pages_.end()) {
-    CHECK_CONSISTENCY_FULL();
+  if (read_iter->read_page_ == pages_.end()) {
+    CHECK_CONSISTENCY_FULL(*read_iter);
     return Status::OK();
   }
 
-  if (!pinned_ && read_page_->len() > default_page_len_
-      && buffer_pool_client_->GetUnusedReservation() < read_page_->len()) {
+  int64_t read_page_len = read_iter->read_page_->len();
+  if (!pinned_ && read_page_len > default_page_len_
+      && buffer_pool_client_->GetUnusedReservation() < read_page_len) {
     // If we are iterating over an unpinned stream and encounter a page that is larger
     // than the default page length, then unpinning the previous page may not have
     // freed up enough reservation to pin the next one. The client is responsible for
     // ensuring the reservation is available, so this indicates a bug.
     return Status(TErrorCode::INTERNAL_ERROR, Substitute("Internal error: couldn't pin "
           "large page of $0 bytes, client only had $1 bytes of unused reservation:\n$2",
-          read_page_->len(), buffer_pool_client_->GetUnusedReservation(),
+          read_page_len, buffer_pool_client_->GetUnusedReservation(),
           buffer_pool_client_->DebugString()));
   }
   // Ensure the next page is pinned for reading. By this point we should have enough
@@ -528,15 +527,8 @@ Status BufferedTupleStream::NextReadPage() {
   // If the stream is unpinned, we freed up enough memory for a default-sized page by
   // deleting or unpinning the previous page and ensured that, if the page was larger,
   // that the reservation is available with the above check.
-  RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
-
-  // This waits for the pin to complete if the page was unpinned earlier.
-  const BufferHandle* read_buffer;
-  RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer));
-
-  read_page_rows_returned_ = 0;
-  read_ptr_ = read_buffer->data();
-  read_end_ptr_ = read_ptr_ + read_buffer->len();
+  RETURN_IF_ERROR(PinPageIfNeeded(&*read_iter->read_page_, pinned_));
+  RETURN_IF_ERROR(read_iter->InitReadPtrs());
 
   // We may need to save reservation for the write page in the case when the write page
   // became a read/write page.
@@ -546,76 +538,80 @@ Status BufferedTupleStream::NextReadPage() {
              write_page_ != nullptr, has_read_write_page())) {
     buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
   }
-  CHECK_CONSISTENCY_FAST();
+  CHECK_CONSISTENCY_FAST(*read_iter);
   return Status::OK();
 }
 
 void BufferedTupleStream::InvalidateReadIterator() {
-  if (read_page_ != pages_.end()) {
+  int64_t rows_returned = read_it_.rows_returned();
+  if (read_it_.read_page_ != pages_.end()) {
     // Unpin the write page if we're reading in unpinned mode.
-    Page* prev_read_page = &*read_page_;
-    read_page_ = pages_.end();
-    read_ptr_ = nullptr;
-    read_end_ptr_ = nullptr;
+    Page* prev_read_page = &*read_it_.read_page_;
+    read_it_.Reset(&pages_);
 
     // May need to decrement pin count after destroying read iterator.
     UnpinPageIfNeeded(prev_read_page, pinned_);
+  } else {
+    read_it_.Reset(&pages_);
   }
-  has_read_iterator_ = false;
   if (read_page_reservation_.GetReservation() > 0) {
     buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
   }
   // It is safe to re-read an attach-on-read stream if no rows were read and no pages
   // were therefore deleted.
-  DCHECK(attach_on_read_ == false || rows_returned_ == 0);
-  if (rows_returned_ == 0) attach_on_read_ = false;
+  DCHECK(read_it_.attach_on_read_ == false || rows_returned == 0);
+  if (rows_returned == 0 && read_it_.attach_on_read_) {
+    read_it_.attach_on_read_ = false;
+  }
+}
+
+void BufferedTupleStream::DoneWriting() {
+  CHECK_CONSISTENCY_FULL(read_it_);
+  InvalidateWriteIterator();
 }
 
 Status BufferedTupleStream::PrepareForRead(bool attach_on_read, bool* got_reservation) {
-  CHECK_CONSISTENCY_FULL();
+  CHECK_CONSISTENCY_FULL(read_it_);
   InvalidateWriteIterator();
   InvalidateReadIterator();
   // If already pinned, no additional pin is needed (see ExpectedPinCount()).
   *got_reservation = pinned_ || pages_.empty()
       || buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
   if (!*got_reservation) return Status::OK();
-  return PrepareForReadInternal(attach_on_read);
+  return PrepareForReadInternal(attach_on_read, &read_it_);
+}
+
+Status BufferedTupleStream::PrepareForPinnedRead(ReadIterator* iter) {
+  DCHECK(pinned_) << "Can only read pinned stream with external iterator";
+  DCHECK(!has_write_iterator());
+  return PrepareForReadInternal(false, iter);
 }
 
-Status BufferedTupleStream::PrepareForReadInternal(bool attach_on_read) {
+Status BufferedTupleStream::PrepareForReadInternal(
+    bool attach_on_read, ReadIterator* read_iter) {
   DCHECK(!closed_);
-  DCHECK(!attach_on_read_);
-  DCHECK(!has_read_iterator());
+  DCHECK(!read_it_.attach_on_read_);
+  DCHECK(!read_iter->is_valid());
 
-  has_read_iterator_ = true;
+  read_iter->Init(attach_on_read);
   if (pages_.empty()) {
     // No rows to return, or a the first read/write page has not yet been allocated.
-    read_page_ = pages_.end();
-    read_ptr_ = nullptr;
-    read_end_ptr_ = nullptr;
+    read_iter->SetReadPage(pages_.end());
   } else {
     // Eagerly pin the first page in the stream.
-    read_page_ = pages_.begin();
+    read_iter->SetReadPage(pages_.begin());
     // Check if we need to increment the pin count of the read page.
-    RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
-    DCHECK(read_page_->is_pinned());
-
-    // This waits for the pin to complete if the page was unpinned earlier.
-    const BufferHandle* read_buffer;
-    RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer));
-    read_ptr_ = read_buffer->data();
-    read_end_ptr_ = read_ptr_ + read_buffer->len();
+    RETURN_IF_ERROR(PinPageIfNeeded(&*read_iter->read_page_, pinned_));
+    DCHECK(read_iter->read_page_->is_pinned());
+    RETURN_IF_ERROR(read_iter->InitReadPtrs());
   }
-  read_page_rows_returned_ = 0;
-  rows_returned_ = 0;
-  attach_on_read_ = attach_on_read;
-  CHECK_CONSISTENCY_FULL();
+  CHECK_CONSISTENCY_FULL(*read_iter);
   return Status::OK();
 }
 
 Status BufferedTupleStream::PinStream(bool* pinned) {
   DCHECK(!closed_);
-  CHECK_CONSISTENCY_FULL();
+  CHECK_CONSISTENCY_FULL(read_it_);
   if (pinned_) {
     *pinned = true;
     return Status::OK();
@@ -655,12 +651,12 @@ Status BufferedTupleStream::PinStream(bool* pinned) {
 
   pinned_ = true;
   *pinned = true;
-  CHECK_CONSISTENCY_FULL();
+  CHECK_CONSISTENCY_FULL(read_it_);
   return Status::OK();
 }
 
 Status BufferedTupleStream::UnpinStream(UnpinMode mode) {
-  CHECK_CONSISTENCY_FULL();
+  CHECK_CONSISTENCY_FULL(read_it_);
   DCHECK(!closed_);
   if (mode == UNPIN_ALL) {
     // Invalidate the iterators so they don't keep pages pinned.
@@ -669,10 +665,10 @@ Status BufferedTupleStream::UnpinStream(UnpinMode mode) {
   }
 
   if (pinned_) {
-    CHECK_CONSISTENCY_FULL();
-    if (&*read_page_ != write_page_ && read_page_ != pages_.end()
-        && read_page_rows_returned_ == read_page_->num_rows) {
-      RETURN_IF_ERROR(NextReadPage());
+    CHECK_CONSISTENCY_FULL(read_it_);
+    if (&*read_it_.read_page_ != write_page_ && read_it_.read_page_ != pages_.end()
+        && read_it_.read_page_rows_returned_ == read_it_.read_page_->num_rows) {
+      RETURN_IF_ERROR(NextReadPage(&read_it_));
     }
 
     // If the stream was pinned, there may be some remaining pinned pages that should
@@ -688,63 +684,69 @@ Status BufferedTupleStream::UnpinStream(UnpinMode mode) {
     }
     pinned_ = false;
   }
-  CHECK_CONSISTENCY_FULL();
+  CHECK_CONSISTENCY_FULL(read_it_);
   return Status::OK();
 }
 
 Status BufferedTupleStream::GetNext(RowBatch* batch, bool* eos) {
-  return GetNextInternal<false>(batch, eos, nullptr);
+  return GetNextInternal<false>(&read_it_, batch, eos, nullptr);
+}
+
+Status BufferedTupleStream::GetNext(ReadIterator* read_iter, RowBatch* batch, bool* eos) {
+  DCHECK(pinned_) << "Stream must remain pinned";
+  DCHECK(read_iter->is_valid());
+  return GetNextInternal<false>(read_iter, batch, eos, nullptr);
 }
 
 Status BufferedTupleStream::GetNext(
     RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) {
-  return GetNextInternal<true>(batch, eos, flat_rows);
+  return GetNextInternal<true>(&read_it_, batch, eos, flat_rows);
 }
 
 template <bool FILL_FLAT_ROWS>
-Status BufferedTupleStream::GetNextInternal(
+Status BufferedTupleStream::GetNextInternal(ReadIterator* RESTRICT read_iter,
     RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) {
   if (has_nullable_tuple_) {
-    return GetNextInternal<FILL_FLAT_ROWS, true>(batch, eos, flat_rows);
+    return GetNextInternal<FILL_FLAT_ROWS, true>(read_iter, batch, eos, flat_rows);
   } else {
-    return GetNextInternal<FILL_FLAT_ROWS, false>(batch, eos, flat_rows);
+    return GetNextInternal<FILL_FLAT_ROWS, false>(read_iter, batch, eos, flat_rows);
   }
 }
 
 template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE>
-Status BufferedTupleStream::GetNextInternal(
+Status BufferedTupleStream::GetNextInternal(ReadIterator* RESTRICT read_iter,
     RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) {
   DCHECK(!closed_);
   DCHECK(batch->row_desc()->Equals(*desc_));
   DCHECK(is_pinned() || !FILL_FLAT_ROWS)
       << "FlatRowPtrs are only valid for pinned streams";
-  *eos = (rows_returned_ == num_rows_);
+  *eos = (read_iter->rows_returned_ == num_rows_);
   if (*eos) return Status::OK();
 
-  if (UNLIKELY(read_page_ == pages_.end()
-          || read_page_rows_returned_ == read_page_->num_rows)) {
-    if (read_page_ != pages_.end() && attach_on_read_
-        && !read_page_->attached_to_output_batch) {
+  if (UNLIKELY(read_iter->read_page_ == pages_.end()
+          || read_iter->read_page_rows_returned_ == read_iter->read_page_->num_rows)) {
+    if (read_iter->read_page_ != pages_.end() && read_iter->attach_on_read_
+        && !read_iter->read_page_->attached_to_output_batch) {
       DCHECK(has_write_iterator());
       // We're in a read-write stream in the case where we're at the end of the read page
       // but the buffer was not attached on the last GetNext() call because the write
       // iterator had not yet advanced.
-      read_page_->AttachBufferToBatch(this, batch, FlushMode::FLUSH_RESOURCES);
+      read_iter->read_page_->AttachBufferToBatch(this, batch, FlushMode::FLUSH_RESOURCES);
       return Status::OK();
     }
     // Get the next page in the stream (or the first page if read_page_ was not yet
     // initialized.) We need to do this at the beginning of the GetNext() call to ensure
     // the buffer management semantics. NextReadPage() may unpin or delete the buffer
     // backing the rows returned from the *previous* call to GetNext().
-    RETURN_IF_ERROR(NextReadPage());
+    RETURN_IF_ERROR(NextReadPage(read_iter));
   }
 
-  DCHECK(has_read_iterator());
-  DCHECK(read_page_ != pages_.end());
-  DCHECK(read_page_->is_pinned()) << DebugString();
-  DCHECK_GE(read_page_rows_returned_, 0);
+  DCHECK(read_iter->is_valid());
+  DCHECK(read_iter->read_page_ != pages_.end());
+  DCHECK(read_iter->read_page_->is_pinned()) << DebugString();
+  DCHECK_GE(read_iter->read_page_rows_returned_, 0);
 
-  int rows_left_in_page = read_page_->num_rows - read_page_rows_returned_;
+  int rows_left_in_page = read_iter->GetRowsLeftInPage();
   int rows_to_fill = std::min(batch->capacity() - batch->num_rows(), rows_left_in_page);
   DCHECK_GE(rows_to_fill, 1);
   uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(batch->GetRow(batch->num_rows()));
@@ -753,29 +755,29 @@ Status BufferedTupleStream::GetNextInternal(
   // null tuple indicator.
   if (FILL_FLAT_ROWS) {
     DCHECK(flat_rows != nullptr);
-    DCHECK(!attach_on_read_);
+    DCHECK(!read_iter->attach_on_read_);
     DCHECK_EQ(batch->num_rows(), 0);
     flat_rows->clear();
     flat_rows->reserve(rows_to_fill);
   }
 
   const uint64_t tuples_per_row = desc_->tuple_descriptors().size();
-  // Start reading from the current position in 'read_page_'.
+  // Start reading from the current position in 'read_iter->read_page_'.
   for (int i = 0; i < rows_to_fill; ++i) {
     if (FILL_FLAT_ROWS) {
-      flat_rows->push_back(read_ptr_);
+      flat_rows->push_back(read_iter->read_ptr_);
       DCHECK_EQ(flat_rows->size(), i + 1);
     }
     // Copy the row into the output batch.
     TupleRow* output_row = reinterpret_cast<TupleRow*>(tuple_row_mem);
     tuple_row_mem += sizeof(Tuple*) * tuples_per_row;
-    UnflattenTupleRow<HAS_NULLABLE_TUPLE>(&read_ptr_, output_row);
+    UnflattenTupleRow<HAS_NULLABLE_TUPLE>(&read_iter->read_ptr_, output_row);
 
     // Update string slot ptrs, skipping external strings.
     for (int j = 0; j < inlined_string_slots_.size(); ++j) {
       Tuple* tuple = output_row->GetTuple(inlined_string_slots_[j].first);
       if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
-      FixUpStringsForRead(inlined_string_slots_[j].second, tuple);
+      FixUpStringsForRead(inlined_string_slots_[j].second, read_iter, tuple);
     }
 
     // Update collection slot ptrs, skipping external collections. We traverse the
@@ -784,15 +786,14 @@ Status BufferedTupleStream::GetNextInternal(
     for (int j = 0; j < inlined_coll_slots_.size(); ++j) {
       Tuple* tuple = output_row->GetTuple(inlined_coll_slots_[j].first);
       if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
-      FixUpCollectionsForRead(inlined_coll_slots_[j].second, tuple);
+      FixUpCollectionsForRead(inlined_coll_slots_[j].second, read_iter, tuple);
     }
   }
 
   batch->CommitRows(rows_to_fill);
-  rows_returned_ += rows_to_fill;
-  read_page_rows_returned_ += rows_to_fill;
-  *eos = (rows_returned_ == num_rows_);
-  if (read_page_rows_returned_ == read_page_->num_rows) {
+  read_iter->IncrRowsReturned(rows_to_fill);
+  *eos = (read_iter->rows_returned() == num_rows_);
+  if (read_iter->GetRowsLeftInPage() == 0) {
     // No more data in this page. NextReadPage() may need to reuse the reservation
     // currently used for 'read_page_' so we may need to flush resources. When
     // 'attach_on_read_' is true, we're returning the buffer. Otherwise the buffer will
@@ -800,13 +801,14 @@ Status BufferedTupleStream::GetNextInternal(
     // signal to the caller that the resources are going away. Note that if there is a
     // read-write page it is not safe to attach the buffer yet because more rows may be
     // appended to the page.
-    if (attach_on_read_) {
+    if (read_iter->attach_on_read_) {
       if (!has_read_write_page()) {
         // Safe to attach because we already called GetBuffer() in NextReadPage().
         // TODO: always flushing for pinned streams is overkill since we may not need
         // to reuse the reservation immediately. Changing this may require modifying
         // callers of this class.
-        read_page_->AttachBufferToBatch(this, batch, FlushMode::FLUSH_RESOURCES);
+        read_iter->read_page_->AttachBufferToBatch(
+            this, batch, FlushMode::FLUSH_RESOURCES);
       }
     } else if (!pinned_) {
       // Flush resources so that we can safely unpin the page on the next GetNext() call.
@@ -816,25 +818,25 @@ Status BufferedTupleStream::GetNextInternal(
     }
   }
   if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill);
-  DCHECK_LE(read_ptr_, read_end_ptr_);
+  DCHECK_LE(read_iter->read_ptr_, read_iter->read_end_ptr_);
   return Status::OK();
 }
 
-void BufferedTupleStream::FixUpStringsForRead(
-    const vector<SlotDescriptor*>& string_slots, Tuple* tuple) {
+void BufferedTupleStream::FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots,
+    ReadIterator* RESTRICT read_iter, Tuple* tuple) {
   DCHECK(tuple != nullptr);
   for (const SlotDescriptor* slot_desc : string_slots) {
     if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
 
     StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
-    DCHECK_LE(read_ptr_ + sv->len, read_end_ptr_);
-    sv->ptr = reinterpret_cast<char*>(read_ptr_);
-    read_ptr_ += sv->len;
+    sv->ptr = reinterpret_cast<char*>(read_iter->read_ptr_);
+    read_iter->AdvanceReadPtr(sv->len);
   }
 }
 
 void BufferedTupleStream::FixUpCollectionsForRead(
-    const vector<SlotDescriptor*>& collection_slots, Tuple* tuple) {
+    const vector<SlotDescriptor*>& collection_slots, ReadIterator* RESTRICT read_iter,
+    Tuple* tuple) {
   DCHECK(tuple != nullptr);
   for (const SlotDescriptor* slot_desc : collection_slots) {
     if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
@@ -842,16 +844,15 @@ void BufferedTupleStream::FixUpCollectionsForRead(
     CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset());
     const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor();
     int coll_byte_size = cv->num_tuples * item_desc.byte_size();
-    DCHECK_LE(read_ptr_ + coll_byte_size, read_end_ptr_);
-    cv->ptr = reinterpret_cast<uint8_t*>(read_ptr_);
-    read_ptr_ += coll_byte_size;
+    cv->ptr = reinterpret_cast<uint8_t*>(read_iter->read_ptr_);
+    read_iter->AdvanceReadPtr(coll_byte_size);
 
     if (!item_desc.HasVarlenSlots()) continue;
     uint8_t* coll_data = cv->ptr;
     for (int i = 0; i < cv->num_tuples; ++i) {
       Tuple* item = reinterpret_cast<Tuple*>(coll_data);
-      FixUpStringsForRead(item_desc.string_slots(), item);
-      FixUpCollectionsForRead(item_desc.collection_slots(), item);
+      FixUpStringsForRead(item_desc.string_slots(), read_iter, item);
+      FixUpCollectionsForRead(item_desc.collection_slots(), read_iter, item);
       coll_data += item_desc.byte_size();
     }
   }
@@ -934,7 +935,7 @@ void BufferedTupleStream::AddLargeRowCustomEnd(int64_t size) noexcept {
     buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
   }
   // The stream should be in a consistent state once the row is added.
-  CHECK_CONSISTENCY_FAST();
+  CHECK_CONSISTENCY_FAST(read_it_);
 }
 
 bool BufferedTupleStream::AddRow(TupleRow* row, Status* status) noexcept {
@@ -1065,37 +1066,63 @@ bool BufferedTupleStream::CopyCollections(const Tuple* tuple,
   return true;
 }
 
-void BufferedTupleStream::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const {
-  DCHECK(row != nullptr);
-  DCHECK(!closed_);
-  DCHECK(is_pinned());
-  DCHECK(!attach_on_read_);
-  uint8_t* data = flat_row;
-  return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) :
-                               UnflattenTupleRow<false>(&data, row);
+void BufferedTupleStream::ReadIterator::Reset(std::list<Page>* pages) {
+  valid_ = false;
+  read_page_ = pages->end();
+  rows_returned_ = 0;
+  read_page_rows_returned_ = -1;
+  read_ptr_ = nullptr;
+  read_end_ptr_ = nullptr;
 }
 
-template <bool HAS_NULLABLE_TUPLE>
-void BufferedTupleStream::UnflattenTupleRow(uint8_t** data, TupleRow* row) const {
-  const int tuples_per_row = desc_->tuple_descriptors().size();
-  uint8_t* ptr = *data;
-  if (HAS_NULLABLE_TUPLE) {
-    // Stitch together the tuples from the page and the NULL ones.
-    const uint8_t* null_indicators = ptr;
-    ptr += NullIndicatorBytesPerRow();
-    for (int i = 0; i < tuples_per_row; ++i) {
-      const uint8_t* null_word = null_indicators + (i >> 3);
-      const uint32_t null_pos = i & 7;
-      const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0);
-      row->SetTuple(
-          i, reinterpret_cast<Tuple*>(reinterpret_cast<uint64_t>(ptr) * is_not_null));
-      ptr += fixed_tuple_sizes_[i] * is_not_null;
-    }
+void BufferedTupleStream::ReadIterator::Init(bool attach_on_read) {
+  valid_ = true;
+  rows_returned_ = 0;
+  DCHECK(!attach_on_read_) << "attach_on_read can only be set once";
+  attach_on_read_ = attach_on_read;
+}
+
+void BufferedTupleStream::ReadIterator::SetReadPage(list<Page>::iterator read_page) {
+  read_page_ = read_page;
+  read_ptr_ = nullptr;
+  read_end_ptr_ = nullptr;
+  read_page_rows_returned_ = 0;
+}
+
+void BufferedTupleStream::ReadIterator::AdvanceReadPage(const list<Page>& pages) {
+  DCHECK(read_page_ != pages.end());
+  ++read_page_;
+  read_ptr_ = nullptr;
+  read_end_ptr_ = nullptr;
+  read_page_rows_returned_ = 0;
+}
+
+Status BufferedTupleStream::ReadIterator::InitReadPtrs() {
+  DCHECK(read_page_->is_pinned());
+  DCHECK_EQ(read_page_rows_returned_, 0);
+  const BufferHandle* read_buffer;
+  RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer));
+  read_ptr_ = read_buffer->data();
+  read_end_ptr_ = read_ptr_ + read_buffer->len();
+  return Status::OK();
+}
+
+void BufferedTupleStream::ReadIterator::IncrRowsReturned(int64_t rows) {
+  rows_returned_ += rows;
+  read_page_rows_returned_ += rows;
+}
+
+string BufferedTupleStream::ReadIterator::DebugString(const list<Page>& pages) const {
+  stringstream ss;
+  ss << "{valid=" << valid_ << " attach_on_read=" << attach_on_read_ << " read_page=";
+  if (read_page_ == pages.end()) {
+    ss << "<end>";
   } else {
-    for (int i = 0; i < tuples_per_row; ++i) {
-      row->SetTuple(i, reinterpret_cast<Tuple*>(ptr));
-      ptr += fixed_tuple_sizes_[i];
-    }
+    ss << read_page_->DebugString();
   }
-  *data = ptr;
+  ss << " read_page_rows_returned=" << read_page_rows_returned_
+     << " rows_returned=" << rows_returned_
+     << " read_ptr_=" << static_cast<const void*>(read_ptr_)
+     << " read_end_ptr_=" << static_cast<const void*>(read_end_ptr_) << "}";
+  return ss.str();
 }
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index e98a9fe..805cd86 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -20,9 +20,11 @@
 
 #include <set>
 #include <vector>
-#include <boost/scoped_ptr.hpp>
 #include <boost/function.hpp>
+#include <boost/scoped_ptr.hpp>
 
+#include "common/atomic.h"
+#include "common/compiler-util.h"
 #include "common/global-types.h"
 #include "common/status.h"
 #include "gutil/macros.h"
@@ -55,7 +57,11 @@ class TupleRow;
 /// To use write-only mode, PrepareForWrite() is called once and AddRow()/AddRowCustom*()
 /// are called repeatedly to initialize then advance a write iterator through the stream.
 /// Once the stream is fully written, it can be read back by calling PrepareForRead()
-/// then GetNext() repeatedly to advance a read iterator through the stream.
+/// then GetNext() repeatedly to advance a read iterator through the stream. If the
+/// stream is pinned, external read iterators can be created with
+/// PrepareForPinnedRead(). Multiple external read iterators can be active at the same
+/// time, reading from different positions in the stream. External read iterators are
+/// thread-safe, which allows safe concurrent access to the stream from different threads.
 ///
 /// To use read/write mode, PrepareForReadWrite() is called once to initialize the read
 /// and write iterators. AddRow()/AddRowCustom*() then advance a write iterator through
@@ -202,6 +208,7 @@ class TupleRow;
 /// TODO: prefetching for pages could speed up iteration over unpinned streams.
 class BufferedTupleStream {
  public:
+  class ReadIterator;
   /// A pointer to the start of a flattened TupleRow in the stream.
   typedef uint8_t* FlatRowPtr;
 
@@ -220,7 +227,7 @@ class BufferedTupleStream {
   /// 'caller_label'. Must be called once before any of the other APIs.
   /// If 'pinned' is true, the tuple stream starts off pinned, otherwise it is unpinned.
   /// 'caller_label' is only used for error reporting.
-  Status Init(const std::string& caller_label, bool pinned) WARN_UNUSED_RESULT;
+  Status Init(const std::string& caller_label, bool pinned);
 
   /// Prepares the stream for writing by saving enough reservation for a default-size
   /// write page. Tries to increase reservation if there is not enough unused reservation
@@ -229,7 +236,7 @@ class BufferedTupleStream {
   /// 'got_reservation': set to true if there was enough reservation to initialize the
   ///     first write page and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
-  Status PrepareForWrite(bool* got_reservation) WARN_UNUSED_RESULT;
+  Status PrepareForWrite(bool* got_reservation);
 
   /// Prepares the stream for interleaved reads and writes by saving enough reservation
   /// for default-sized read and write pages. Called after Init() and before the first
@@ -239,7 +246,11 @@ class BufferedTupleStream {
   ///     read and write pages and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
   Status PrepareForReadWrite(
-      bool attach_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+      bool attach_on_read, bool* got_reservation);
+
+  /// Explicitly finishes writing the stream, invalidating the write iterator (if
+  /// there is one). Must be called after the last AddRow() or AddRowCustomEnd().
+  void DoneWriting();
 
   /// Prepares the stream for reading, invalidating the write iterator (if there is one).
   /// Therefore must be called after the last AddRow() or AddRowCustomEnd() and before
@@ -250,7 +261,14 @@ class BufferedTupleStream {
   /// 'got_reservation': set to true if there was enough reservation to initialize the
   ///     first read page and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
-  Status PrepareForRead(bool attach_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+  Status PrepareForRead(bool attach_on_read, bool* got_reservation);
+
+  /// Prepares 'iter' for reading a pinned stream. The stream must not have a write
+  /// iterator and stream must be pinned. This does not attach pages on read.
+  ///
+  /// This method is safe to call concurrently from different threads, as long as the
+  /// stream remains pinned and 'iter' is not shared between threads.
+  Status PrepareForPinnedRead(ReadIterator* iter);
 
   /// Adds a single row to the stream. There are three possible outcomes:
   /// a) The append succeeds. True is returned.
@@ -271,7 +289,7 @@ class BufferedTupleStream {
   ///
   /// BufferedTupleStream will do a deep copy of the memory in the row. After AddRow()
   /// returns an error, it should not be called again.
-  bool AddRow(TupleRow* row, Status* status) noexcept WARN_UNUSED_RESULT;
+  bool AddRow(TupleRow* row, Status* status) noexcept;
 
   /// Allocates space to store a row of 'size' bytes (including fixed and variable length
   /// data). If successful, returns a pointer to the allocated row. The caller then must
@@ -298,7 +316,7 @@ class BufferedTupleStream {
   /// If the current unused reservation is not sufficient to pin the stream in memory,
   /// this will try to increase the reservation. If that fails, 'pinned' is set to false
   /// and the stream is left unpinned. Otherwise 'pinned' is set to true.
-  Status PinStream(bool* pinned) WARN_UNUSED_RESULT;
+  Status PinStream(bool* pinned);
 
   /// Modes for UnpinStream().
   enum UnpinMode {
@@ -316,7 +334,7 @@ class BufferedTupleStream {
   };
 
   /// Unpins stream with the given 'mode' as described above.
-  Status UnpinStream(UnpinMode mode) WARN_UNUSED_RESULT;
+  Status UnpinStream(UnpinMode mode);
 
   /// Get the next batch of output rows, which are backed by the stream's memory.
   ///
@@ -334,13 +352,21 @@ class BufferedTupleStream {
   ///
   /// If the stream is pinned and 'attach_on_read' is false, the memory backing the
   /// rows will remain valid until the stream is unpinned, destroyed, etc.
-  Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT;
+  Status GetNext(RowBatch* batch, bool* eos);
+
+  /// Get the next batch of output rows from 'read_iter', which are backed by the
+  /// pinned stream's memory. The memory backing the rows is valid as long as the
+  /// stream remains pinned.
+  ///
+  /// This method is safe to call concurrently from different threads, as long as the
+  /// stream remains pinned and 'iter' is not shared between threads.
+  Status GetNext(ReadIterator* read_iter, RowBatch* batch, bool* eos);
 
   /// Same as above, but populate 'flat_rows' with a pointer to the flat version of
   /// each returned row in the pinned stream. The pointers in 'flat_rows' are only
   /// valid as long as the stream remains pinned.
   Status GetNext(
-      RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows) WARN_UNUSED_RESULT;
+      RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows);
 
   /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL,
   /// attaches buffers from pinned pages that rows returned from GetNext() may reference.
@@ -352,7 +378,7 @@ class BufferedTupleStream {
   int64_t num_rows() const { return num_rows_; }
 
   /// Number of rows returned via GetNext().
-  int64_t rows_returned() const { return rows_returned_; }
+  int64_t rows_returned() const { return read_it_.rows_returned_; }
 
   /// Returns the byte size necessary to store the entire stream in memory.
   int64_t byte_size() const { return total_byte_size_; }
@@ -373,25 +399,31 @@ class BufferedTupleStream {
 
   bool is_closed() const { return closed_; }
   bool is_pinned() const { return pinned_; }
-  bool has_read_iterator() const { return has_read_iterator_; }
+  // Returns true if there is an active embedded read iterator.
+  bool has_read_iterator() const { return read_it_.is_valid(); }
   bool has_write_iterator() const { return has_write_iterator_; }
 
   std::string DebugString() const;
 
  private:
-  DISALLOW_COPY_AND_ASSIGN(BufferedTupleStream);
-  friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test;
-  friend class ArrayTupleStreamTest_TestComputeRowSize_Test;
-  friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test;
-
   /// Wrapper around BufferPool::PageHandle that tracks additional info about the page.
   struct Page {
+    /// Default constructor.
+    Page() {}
+
+    /// Move constructor so this can reside in a vector. Not thread-safe.
+    Page(Page&& src)
+      : handle(std::move(src.handle)),
+        num_rows(src.num_rows),
+        retrieved_buffer(src.retrieved_buffer.Load()),
+        attached_to_output_batch(src.attached_to_output_batch) {}
+
     inline int len() const { return handle.len(); }
     inline bool is_pinned() const { return handle.is_pinned(); }
     inline int pin_count() const { return handle.pin_count(); }
     Status GetBuffer(const BufferPool::BufferHandle** buffer) {
       RETURN_IF_ERROR(handle.GetBuffer(buffer));
-      retrieved_buffer = true;
+      retrieved_buffer.Store(true);
       return Status::OK();
     }
 
@@ -409,14 +441,110 @@ class BufferedTupleStream {
 
     /// Whether we called GetBuffer() on the page since it was last pinned. This means
     /// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() may have
-    /// returned rows referencing the page's buffer.
-    bool retrieved_buffer = true;
+    /// returned rows referencing the page's buffer. This is atomic because multiple
+    /// iterators may call GetBuffer() concurrently.
+    AtomicBool retrieved_buffer{true};
 
     /// If the page was just attached to the output batch on the last GetNext() call while
     /// in attach_on_read mode. If true, then 'handle' is closed.
     bool attached_to_output_batch = false;
   };
 
+ public:
+  /// A read iterator for a tuple stream. This keeps track of the read position in the
+  /// tuple stream and is advanced forward while reading rows from the stream.
+  /// This can be used in two modes:
+  /// * As the built-in iterator for the stream. This supports various advanced
+  ///   functionality like reading unpinned streams, co-existing with a write iterator,
+  ///   etc.
+  /// * As an external iterator for a read-only pinned stream. In this case, multiple
+  ///   iterators can be used simultaneously and reading from the stream via different
+  ///   iterators is thread safe (as long as each iterator is not accessed concurrently
+  //    from different threads).
+  class ReadIterator {
+   public:
+    int64_t rows_returned() const { return rows_returned_; }
+    bool is_valid() const { return valid_; }
+
+    std::string DebugString(const std::list<Page>& pages) const;
+
+   private:
+    friend class BufferedTupleStream;
+
+    /// True if the read iterator is currently valid
+    bool valid_ = false;
+
+    /// If true, pages are deleted after they are read via this read iterator. This is
+    /// only used for the embedded read iterator, i.e. BufferedTupleStream::read_it_.
+    /// Once rows have been read from a stream with 'attach_on_read_' true, this is
+    /// always true.
+    bool attach_on_read_ = false;
+
+    /// The current page being read. When no read iterator is active, equal to list.end().
+    /// When a read iterator is active, either points to the current read page, or equals
+    /// list.end() if no rows have yet been read.  GetNext() does not advance this past
+    /// the end of the stream, so upon eos 'read_page_' points to the last page and
+    /// rows_returned_ == num_rows_. Always pinned, unless a Pin() call failed and an
+    /// error status was returned.
+    std::list<Page>::iterator read_page_;
+
+    /// Total number of rows returned via this read iterator since Init() was called.
+    int64_t rows_returned_ = 0;
+
+    /// Number of rows returned from the current 'read_page_'.
+    uint32_t read_page_rows_returned_ = -1;
+
+    /// Pointer into 'read_page_' to the byte after the last row read.
+    uint8_t* read_ptr_ = nullptr;
+
+    /// Pointer to one byte past the end of 'read_page_'. Used to detect overruns.
+    const uint8_t* read_end_ptr_ = nullptr;
+
+    /// Initializes a read iterator to a valid state.
+    void Init(bool attach_on_read);
+
+    /// Invalidate the read iterator.
+    void Reset(std::list<Page>* pages);
+
+    /// Sets the current read page and clears state from any previous read page.
+    /// 'read_page' may be pages.end() if there are no pages in the stream yet.
+    /// The iterator must be valid.
+    void SetReadPage(std::list<Page>::iterator read_page);
+
+    /// Set the read page to the next read page. This iterator must be valid and
+    /// 'read_page_' must point to a page, not pages_.end().
+    void AdvanceReadPage(const std::list<Page>& pages);
+
+    /// Prepares to read from pinned 'read_page_'. Called after SetReadPage()
+    /// or AdvanceReadPage().  Waits for the pin to complete if the page was
+    /// unpinned earlier.  Sets 'read_ptr_' and 'read_end_ptr_'.
+    Status InitReadPtrs();
+
+    /// Move forward 'read_ptr_' by 'bytes'. It is invalid to advance more than 1
+    /// byte past the end of the buffer.
+    void AdvanceReadPtr(int64_t bytes) {
+      DCHECK_GE(bytes, 0);
+      read_ptr_ += bytes;
+      DCHECK_LE(read_ptr_, read_end_ptr_);
+    }
+
+    /// Increment the number of rows returned from this page and in total.
+    void IncrRowsReturned(int64_t rows);
+
+    /// Return the number of rows left to read in 'read_page_'. This iterator must be
+    /// valid and 'read_page_' must point to a page, not pages_.end().
+    int64_t GetRowsLeftInPage() const {
+      DCHECK_GE(read_page_->num_rows, read_page_rows_returned_);
+      return read_page_->num_rows - read_page_rows_returned_;
+    }
+  };
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(BufferedTupleStream);
+  friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test;
+  friend class ArrayTupleStreamTest_TestComputeRowSize_Test;
+  friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test;
+
   /// Runtime state instance used to check for cancellation. Not owned.
   RuntimeState* const state_;
 
@@ -456,41 +584,20 @@ class BufferedTupleStream {
   /// mode.
   int64_t total_byte_size_ = 0;
 
-  /// True if there is currently an active read iterator for the stream.
-  bool has_read_iterator_ = false;
+  /// The built-in read iterator for the stream.
+  ReadIterator read_it_;
 
-  /// The current page being read. When no read iterator is active, equal to list.end().
-  /// When a read iterator is active, either points to the current read page, or equals
-  /// list.end() if no rows have yet been read.  GetNext() does not advance this past
-  /// the end of the stream, so upon eos 'read_page_' points to the last page and
-  /// rows_returned_ == num_rows_. Always pinned, unless a Pin() call failed and an error
-  /// status was returned.
-  std::list<Page>::iterator read_page_;
-
-  /// Saved reservation for read iterator. 'default_page_len_' reservation is saved if
-  /// there is a read iterator, no pinned read page, and the possibility that the read
-  /// iterator will advance to a valid page.
+  /// Saved reservation for the embedded read iterator. 'default_page_len_' reservation
+  /// is saved if there is a read iterator, no pinned read page, and the possibility
+  /// that the read iterator will advance to a valid page.
   BufferPool::SubReservation read_page_reservation_;
 
-  /// Number of rows returned from the current read_page_.
-  uint32_t read_page_rows_returned_ = -1;
-
-  /// Pointer into read_page_ to the byte after the last row read.
-  uint8_t* read_ptr_ = nullptr;
-
-  /// Pointer to one byte past the end of read_page_. Used to detect overruns.
-  const uint8_t* read_end_ptr_ = nullptr;
-
   /// Pointer into write_page_ to the byte after the last row written.
   uint8_t* write_ptr_ = nullptr;
 
   /// Pointer to one byte past the end of write_page_. Cached to speed up computation
   const uint8_t* write_end_ptr_ = nullptr;
 
-  /// Number of rows returned to the caller from GetNext() since the last
-  /// PrepareForRead() call.
-  int64_t rows_returned_ = 0;
-
   /// True if there is currently an active write iterator into the stream.
   bool has_write_iterator_ = false;
 
@@ -532,20 +639,18 @@ class BufferedTupleStream {
   /// Whether any tuple in the rows is nullable.
   const bool has_nullable_tuple_;
 
-  /// If true, pages are deleted after they are read during this read pass. Once rows
-  /// have been read from a stream with 'attach_on_read_' true, this is always true.
-  bool attach_on_read_ = false;
-
   bool closed_ = false; // Used for debugging.
 
   /// If true, this stream has been explicitly pinned by the caller and all pages are
   /// kept pinned until the caller calls UnpinStream().
   bool pinned_ = true;
 
+  /// Return true if 'page' is the current page for the embedded read iterator.
   bool is_read_page(const Page* page) const {
-    return read_page_ != pages_.end() && &*read_page_ == page;
+    return read_it_.read_page_ != pages_.end() && &*read_it_.read_page_ == page;
   }
 
+  /// Return true if 'page' is the current page for the embedded write iterator.
   bool is_write_page(const Page* page) const { return write_page_ == page; }
 
   /// Return true if the read and write page are the same.
@@ -590,7 +695,7 @@ class BufferedTupleStream {
   /// Gets a new page of 'page_len' bytes from buffer_pool_, updating write_page_,
   /// write_ptr_ and write_end_ptr_. The caller must ensure there is 'page_len' unused
   /// reservation. The caller must reset the write page (if there is one) before calling.
-  Status NewWritePage(int64_t page_len) noexcept WARN_UNUSED_RESULT;
+  Status NewWritePage(int64_t page_len) noexcept;
 
   /// Determines what page size is needed to fit a row of 'row_size' bytes.
   /// Returns an error if the row cannot fit in a page.
@@ -603,7 +708,7 @@ class BufferedTupleStream {
   /// 'got_reservation' to false if the reservation could not be increased and no other
   /// error was encountered.
   Status AdvanceWritePage(
-      int64_t row_size, bool* got_reservation) noexcept WARN_UNUSED_RESULT;
+      int64_t row_size, bool* got_reservation) noexcept;
 
   /// Reset the write page, if there is one, and unpin pages accordingly. If there
   /// is an active write iterator, the next row will be appended to a new page.
@@ -613,19 +718,19 @@ class BufferedTupleStream {
   /// calling this, no more rows can be appended to the stream.
   void InvalidateWriteIterator();
 
-  /// Same as PrepareForRead(), except the iterators are not invalidated and
-  /// the caller is assumed to have checked there is sufficient unused reservation.
-  Status PrepareForReadInternal(bool attach_on_read) WARN_UNUSED_RESULT;
+  /// Helper for PrepareForRead() and PrepareForPinnedRead(). Sets up 'read_iter' for
+  /// reading from the start of the stream. Does not invalidate any existing iterators.
+  /// The caller must ensure there is sufficient unused reservation.
+  Status PrepareForReadInternal(bool attach_on_read, ReadIterator* read_iter);
 
-  /// Pins the next read page. This blocks reading from disk if necessary to bring the
-  /// page's data into memory. Updates read_page_, read_ptr_, and
-  /// read_page_rows_returned_.
-  Status NextReadPage() WARN_UNUSED_RESULT;
+  /// Pins the next read page for 'read_iter'. This blocks reading from disk if necessary
+  /// to bring the page's data into memory.
+  Status NextReadPage(ReadIterator* read_iter);
 
   /// Invalidate the read iterator, and release any resources associated with the active
-  /// iterator. Invalid to call if 'attach_on_read_' is true and >= 1 rows have been read,
-  /// because that would leave the stream in limbo where it still has pages but it is
-  /// invalid to read or write from in future.
+  /// iterator. Invalid to call if 'read_it_.attach_on_read_' is true and >= 1 rows have
+  /// been read, because that would leave the stream in limbo where it still has pages
+  /// but it is invalid to read or write from in future.
   void InvalidateReadIterator();
 
   /// Returns the total additional bytes that this row will consume in write_page_ if
@@ -634,14 +739,14 @@ class BufferedTupleStream {
   int64_t ComputeRowSize(TupleRow* row) const noexcept;
 
   /// Pins page and updates tracking stats.
-  Status PinPage(Page* page) WARN_UNUSED_RESULT;
+  Status PinPage(Page* page);
 
   /// Increment the page's pin count if this page needs a higher pin count given the
   /// current read and write iterator positions and whether the stream will be pinned
   /// ('stream_pinned'). Assumes that no scenarios occur when the pin count needs to
   /// be incremented multiple times. The caller is responsible for ensuring sufficient
   /// reservation is available.
-  Status PinPageIfNeeded(Page* page, bool stream_pinned) WARN_UNUSED_RESULT;
+  Status PinPageIfNeeded(Page* page, bool stream_pinned);
 
   /// Decrement the page's pin count if this page needs a lower pin count given the
   /// current read and write iterator positions and whether the stream will be pinned
@@ -686,9 +791,11 @@ class BufferedTupleStream {
 
   /// Templated GetNext implementations.
   template <bool FILL_FLAT_ROWS>
-  Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows);
+  Status GetNextInternal(ReadIterator* RESTRICT read_iter, RowBatch* batch, bool* eos,
+      std::vector<FlatRowPtr>* flat_rows);
   template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE>
-  Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows);
+  Status GetNextInternal(ReadIterator* RESTRICT read_iter, RowBatch* batch, bool* eos,
+      std::vector<FlatRowPtr>* flat_rows);
 
   /// Helper function to convert a flattened TupleRow stored starting at '*data' into
   /// 'row'. *data is updated to point to the first byte past the end of the row.
@@ -697,16 +804,18 @@ class BufferedTupleStream {
 
   /// Helper function for GetNextInternal(). For each string slot in string_slots,
   /// update StringValue's ptr field to point to the corresponding string data stored
-  /// inline in the stream (at the current value of read_ptr_) advance read_ptr_ by the
-  /// StringValue's length field.
-  void FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots, Tuple* tuple);
+  /// inline in the stream (at the current value of read_iter->read_ptr_) advance
+  /// read_iter->read_ptr_ by the StringValue's length field.
+  static void FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots,
+      ReadIterator* RESTRICT read_iter, Tuple* tuple);
 
   /// Helper function for GetNextInternal(). For each collection slot in collection_slots,
   /// recursively update any pointers in the CollectionValue to point to the corresponding
-  /// var len data stored inline in the stream, advancing read_ptr_ as data is read.
-  /// Assumes that the collection was serialized to the stream in DeepCopy()'s format.
-  void FixUpCollectionsForRead(
-      const vector<SlotDescriptor*>& collection_slots, Tuple* tuple);
+  /// var len data stored inline in the stream, advancing read_iter->read_ptr_ as data is
+  /// read.  Assumes that the collection was serialized to the stream in DeepCopy()'s
+  /// format.
+  static void FixUpCollectionsForRead(const vector<SlotDescriptor*>& collection_slots,
+      ReadIterator* RESTRICT read_iter, Tuple* tuple);
 
   /// Returns the number of null indicator bytes per row. Only valid if this stream has
   /// nullable tuples.
@@ -719,9 +828,11 @@ class BufferedTupleStream {
   /// a consistent state after returning success from a public API call. The Fast version
   /// has constant runtime and does not check all of 'pages_'. The Full version includes
   /// O(n) checks that require iterating over the whole 'pages_' list (e.g. checking that
-  /// each page is in a valid state).
-  void CheckConsistencyFast() const;
-  void CheckConsistencyFull() const;
+  /// each page is in a valid state). 'read_it' is a read iterator that is safe for
+  /// the current thread to access ('read_iter_' in most cases except when an internal
+  /// read iterator is used.
+  void CheckConsistencyFast(const ReadIterator& read_it) const;
+  void CheckConsistencyFull(const ReadIterator& read_it) const;
   void CheckPageConsistency(const Page* page) const;
 };
 }
diff --git a/be/src/runtime/buffered-tuple-stream.inline.h b/be/src/runtime/buffered-tuple-stream.inline.h
index 2e1aad7..5de894d 100644
--- a/be/src/runtime/buffered-tuple-stream.inline.h
+++ b/be/src/runtime/buffered-tuple-stream.inline.h
@@ -51,6 +51,42 @@ inline uint8_t* BufferedTupleStream::AddRowCustomBegin(int64_t size, Status* sta
 inline void BufferedTupleStream::AddRowCustomEnd(int64_t size) {
   if (UNLIKELY(size > default_page_len_)) AddLargeRowCustomEnd(size);
 }
+
+inline void BufferedTupleStream::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const {
+  DCHECK(row != nullptr);
+  DCHECK(!closed_);
+  DCHECK(is_pinned());
+  DCHECK(!read_it_.attach_on_read_);
+  uint8_t* data = flat_row;
+  return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) :
+                               UnflattenTupleRow<false>(&data, row);
+}
+
+template <bool HAS_NULLABLE_TUPLE>
+inline void BufferedTupleStream::UnflattenTupleRow(uint8_t** data, TupleRow* row) const {
+  const int tuples_per_row = desc_->tuple_descriptors().size();
+  uint8_t* ptr = *data;
+  if (HAS_NULLABLE_TUPLE) {
+    // Stitch together the tuples from the page and the NULL ones.
+    const uint8_t* null_indicators = ptr;
+    ptr += NullIndicatorBytesPerRow();
+    for (int i = 0; i < tuples_per_row; ++i) {
+      const uint8_t* null_word = null_indicators + (i >> 3);
+      const uint32_t null_pos = i & 7;
+      const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0);
+      row->SetTuple(
+          i, reinterpret_cast<Tuple*>(reinterpret_cast<uint64_t>(ptr) * is_not_null));
+      ptr += fixed_tuple_sizes_[i] * is_not_null;
+    }
+  } else {
+    for (int i = 0; i < tuples_per_row; ++i) {
+      row->SetTuple(i, reinterpret_cast<Tuple*>(ptr));
+      ptr += fixed_tuple_sizes_[i];
+    }
+  }
+  *data = ptr;
+}
+
 }
 
 #endif
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 8224179..c2caf7b 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -121,9 +121,11 @@ struct BufferPool::Page : public InternalList<Page>::Node {
   int pin_count;
 
   /// True if the read I/O to pin the page was started but not completed. Only accessed
-  /// in contexts that are passed the associated PageHandle, so it cannot be accessed
-  /// by multiple threads concurrently.
-  bool pin_in_flight;
+  /// in contexts that are passed the associated PageHandle, so can only be accessed
+  /// by multiple threads concurrently via PageHandle::GetBuffer(), since other page
+  /// handle operators are not thread-safe. This is atomic so that GetBuffer() can do
+  /// optimistic checks to avoid acquiring 'buffer_lock'.
+  AtomicBool pin_in_flight;
 
   /// Non-null if there is a write in flight, the page is clean, or the page is evicted.
   std::unique_ptr<TmpWriteHandle> write_handle;
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 57b1046..ea788c4 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -355,7 +355,7 @@ class BufferPoolTest : public ::testing::Test {
 
   // Return whether a pin is in flight for the page.
   static bool PinInFlight(PageHandle* page) {
-    return page->page_->pin_in_flight;
+    return page->page_->pin_in_flight.Load();
   }
 
   // Return the path of the temporary file backing the page.
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 65af75d..1c900cb 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -97,11 +97,13 @@ Status BufferPool::PageHandle::GetBuffer(const BufferHandle** buffer) const {
   DCHECK(is_open());
   DCHECK(client_->is_registered());
   DCHECK(is_pinned());
-  if (page_->pin_in_flight) {
+  // Dirty check to see if we might need to wait for the pin to finish (this
+  // avoids the lock acquisition in the common case).
+  if (page_->pin_in_flight.Load()) {
     // Finish the work started in Pin().
     RETURN_IF_ERROR(client_->impl_->FinishMoveEvictedToPinned(page_));
   }
-  DCHECK(!page_->pin_in_flight);
+  DCHECK(!page_->pin_in_flight.Load());
   *buffer = &page_->buffer;
   DCHECK((*buffer)->is_open());
   return Status::OK();
@@ -156,9 +158,9 @@ void BufferPool::DestroyPage(ClientHandle* client, PageHandle* handle) {
 
   if (handle->is_pinned()) {
     // Cancel the read I/O - we don't need the data any more.
-    if (handle->page_->pin_in_flight) {
+    if (handle->page_->pin_in_flight.Load()) {
       handle->page_->write_handle->CancelRead();
-      handle->page_->pin_in_flight = false;
+      handle->page_->pin_in_flight.Store(false);
     }
     // In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do the work
     // of cleaning up the page, freeing the buffer and updating reservations correctly.
@@ -199,7 +201,7 @@ void BufferPool::Unpin(ClientHandle* client, PageHandle* handle) {
   reservation->ReleaseTo(page->len);
 
   if (--page->pin_count > 0) return;
-  if (page->pin_in_flight) {
+  if (page->pin_in_flight.Load()) {
     // Data is not in memory - move it back to evicted.
     client->impl_->UndoMoveEvictedToPinned(page);
   } else {
@@ -550,7 +552,7 @@ Status BufferPool::Client::StartMoveEvictedToPinned(
   RETURN_IF_ERROR(
       file_group_->ReadAsync(page->write_handle.get(), page->buffer.mem_range()));
   pinned_pages_.Enqueue(page);
-  page->pin_in_flight = true;
+  page->pin_in_flight.Store(true);
   DCHECK_CONSISTENCY();
   return Status::OK();
 }
@@ -560,9 +562,9 @@ void BufferPool::Client::UndoMoveEvictedToPinned(Page* page) {
   // * There is no in-flight read.
   // * The page's data is on disk referenced by 'write_handle'
   // * The page has no attached buffer.
-  DCHECK(page->pin_in_flight);
+  DCHECK(page->pin_in_flight.Load());
   page->write_handle->CancelRead();
-  page->pin_in_flight = false;
+  page->pin_in_flight.Store(false);
 
   unique_lock<mutex> lock(lock_);
   DCHECK_CONSISTENCY();
@@ -574,15 +576,17 @@ void BufferPool::Client::UndoMoveEvictedToPinned(Page* page) {
 }
 
 Status BufferPool::Client::FinishMoveEvictedToPinned(Page* page) {
-  DCHECK(page->pin_in_flight);
   SCOPED_TIMER(counters().read_wait_time);
+  lock_guard<SpinLock> pl(page->buffer_lock);
+  // Another thread may have moved it to pinned in the meantime.
+  if (!page->pin_in_flight.Load()) return Status::OK();
   // Don't hold any locks while reading back the data. It is safe to modify the page's
   // buffer handle without holding any locks because no concurrent operations can modify
   // evicted pages.
   RETURN_IF_ERROR(file_group_->WaitForAsyncRead(
       page->write_handle.get(), page->buffer.mem_range(), &counters_));
   file_group_->DestroyWriteHandle(move(page->write_handle));
-  page->pin_in_flight = false;
+  page->pin_in_flight.Store(false);
   return Status::OK();
 }
 
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index a35d8f3..6c81b2a 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -442,9 +442,10 @@ class BufferPool::SubReservation {
   boost::scoped_ptr<ReservationTracker> tracker_;
 };
 
-/// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only
-/// be used by a single thread at a time: concurrently calling BufferHandle methods or
-/// BufferPool methods with the BufferHandle as an argument is not supported.
+/// A handle to a buffer allocated from the buffer pool. Only const methods on
+/// BufferHandle are thread-safe. It is not safe to call non-constant BufferHandle
+/// methods or BufferPool methods with the BufferHandle as an argument concurrently
+/// with any other operations on the BufferHandle.
 class BufferPool::BufferHandle {
  public:
   BufferHandle() { Reset(); }
@@ -508,9 +509,10 @@ class BufferPool::BufferHandle {
   int home_core_;
 };
 
-/// The handle for a page used by clients of the BufferPool. Each PageHandle should
-/// only be used by a single thread at a time: concurrently calling PageHandle methods
-/// or BufferPool methods with the PageHandle as an argument is not supported.
+/// The handle for a page used by clients of the BufferPool. Only const methods on
+/// PageHandle are thread-safe. It is not safe to call non-constant PageHandle
+/// methods or BufferPool methods with the PageHandle as an argument concurrently
+/// with any other operations on the PageHandle.
 class BufferPool::PageHandle {
  public:
   PageHandle();
@@ -535,6 +537,9 @@ class BufferPool::PageHandle {
   /// since the last call to GetBuffer(). Only const accessors of the returned handle can
   /// be used: it is invalid to call FreeBuffer() or TransferBuffer() on it or to
   /// otherwise modify the handle.
+  ///
+  /// This is safe to call from multiple threads at the same time as long as the
+  /// page is pinned.
   Status GetBuffer(const BufferHandle** buffer_handle) const WARN_UNUSED_RESULT;
 
   std::string DebugString() const;
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index dfeb067..4e5f5a0 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -86,6 +86,7 @@ PRINT_THRIFT_ENUM_IMPL(THdfsFileFormat)
 PRINT_THRIFT_ENUM_IMPL(THdfsSeqCompressionMode)
 PRINT_THRIFT_ENUM_IMPL(TImpalaQueryOptions)
 PRINT_THRIFT_ENUM_IMPL(TJoinDistributionMode)
+PRINT_THRIFT_ENUM_IMPL(TJoinOp)
 PRINT_THRIFT_ENUM_IMPL(TKuduReadMode)
 PRINT_THRIFT_ENUM_IMPL(TMetricKind)
 PRINT_THRIFT_ENUM_IMPL(TParquetArrayResolution)
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index ab0e743..eaaccf6 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -62,6 +62,7 @@ std::string PrintThriftEnum(const THdfsFileFormat::type& value);
 std::string PrintThriftEnum(const THdfsSeqCompressionMode::type& value);
 std::string PrintThriftEnum(const TImpalaQueryOptions::type& value);
 std::string PrintThriftEnum(const TJoinDistributionMode::type& value);
+std::string PrintThriftEnum(const TJoinOp::type& value);
 std::string PrintThriftEnum(const TKuduReadMode::type& value);
 std::string PrintThriftEnum(const TMetricKind::type& value);
 std::string PrintThriftEnum(const TParquetArrayResolution::type& value);
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 5c774ce..e840b7f 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -454,6 +454,11 @@ error_codes = (
   ("ORC_TYPE_NOT_ROOT_AT_STRUCT", 148,
    "Root of the $0 type returned by the ORC lib is not STRUCT: $1. "
    "Either there are bugs in the ORC lib or ORC file '$2' is corrupt."),
+
+  ("NAAJ_OUT_OF_MEMORY", 149,
+   "Unable to perform Null-Aware Anti-Join. Could not get enough reservation to fit "
+   "all rows with NULLs from the build side in memory. Memory required for $0 rows "
+   "was $1. $2/$3 of the join's reservation was available for the rows."),
 )
 
 import sys
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index 8ee860e..0bb77aa 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -261,7 +261,6 @@ public class HashJoinNode extends JoinNode {
       // probe streams when spilling. mt_dop is an upper bound on the number of
       // PartitionedHashJoinNodes per builder.
       // TODO: IMPALA-9416: be less conservative here
-      // TODO: how did we not detect the reservation bug here with spilling?
       perInstanceBuildMinMemReservation *= queryOptions.mt_dop;
     }
     // Most reservation for probe buffers is obtained from the join builder when
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index bccf900..0a3a708 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -194,8 +194,6 @@ public abstract class JoinNode extends PlanNode {
   // Returns true if we can share a join build between multiple consuming fragment
   // instances.
   public boolean canShareBuild() {
-    // TODO: IMPALA-9176: null-aware anti-join doesn't support join build sharing.
-    if (joinOp_ == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) return false;
     return distrMode_ == JoinNode.DistributionMode.BROADCAST;
   }
 
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
index c5592f8..dc1d795 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
@@ -3652,8 +3652,8 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> ps_partkey
    row-size=16B cardinality=800.00K
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=50.19MB Threads=8
-Per-Host Resource Estimates: Memory=201MB
+Max Per-Host Resource Reservation: Memory=52.19MB Threads=8
+Per-Host Resource Estimates: Memory=203MB
 PLAN-ROOT SINK
 |
 14:MERGING-EXCHANGE [UNPARTITIONED]
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-naaj-no-deny-reservation.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-naaj-no-deny-reservation.test
index 7678675..1db2764 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-naaj-no-deny-reservation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-naaj-no-deny-reservation.test
@@ -54,6 +54,7 @@ order by 1,2,3,4
 BIGINT,BIGINT,BIGINT,INT
 ====
 ---- QUERY
+set max_row_size=64k;
 set debug_action="-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@1.0";
 # Execute NAAJ query 1 with a debug action so that it can not get enough memory to fit
 # the null build rows in memory.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
index b370349..3ebcea7 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
@@ -55,7 +55,6 @@ row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
 # Setting max_row_size == default_spillable_buffer_size was sufficient to trigger
 # IMPALA-9349, because it means there is no surplus reservation during repartitioning.
 set debug_action="-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@1.0";
-set max_row_size=256k;
 select straight_join count(*)
 from
 supplier right outer join lineitem on s_suppkey = l_suppkey
@@ -84,7 +83,7 @@ row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
 ---- QUERY
 # Aggregation query that will OOM and fail to spill because of IMPALA-3304 without
 # any help from DEBUG_ACTION.
-set mem_limit=80m;
+set mem_limit=75m;
 select l_orderkey, group_concat(l_comment) comments
 from lineitem
 group by l_orderkey
diff --git a/tests/query_test/test_spilling.py b/tests/query_test/test_spilling.py
index 035d79d..818ba45 100644
--- a/tests/query_test/test_spilling.py
+++ b/tests/query_test/test_spilling.py
@@ -113,7 +113,7 @@ class TestSpillingNoDebugActionDimensions(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(create_parquet_dimension('tpch'))
     # Tests are calibrated so that they can execute and spill with this page size.
     cls.ImpalaTestMatrix.add_dimension(
-        create_exec_option_dimension_from_dict({'default_spillable_buffer_size': ['256k'],
+        create_exec_option_dimension_from_dict({'default_spillable_buffer_size': ['64k'],
             'mt_dop': [0, 4]}))
 
   def test_spilling_naaj_no_deny_reservation(self, vector):