You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/10 23:35:01 UTC

[2/5] impala git commit: IMPALA-7231: group plan nodes into pipelines

http://git-wip-us.apache.org/repos/asf/impala/blob/b7d509d7/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 5a2e1c5..439c1ed 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -17,6 +17,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=40.00MB Threads=3
 Per-Host Resource Estimates: Memory=80MB
@@ -29,6 +30,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=263B cardinality=6001215
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=2
@@ -40,6 +42,7 @@ Per-Host Resources: mem-estimate=80.00MB mem-reservation=40.00MB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=80.00MB Threads=3
 Per-Host Resource Estimates: Memory=160MB
@@ -52,6 +55,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=263B cardinality=6001215
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=160.00MB mem-reservation=80.00MB thread-reservation=2
@@ -63,6 +67,7 @@ Per-Host Resources: mem-estimate=160.00MB mem-reservation=80.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=0
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ====
 # Single column parquet scan - memory reservation is reduced compared to multi-column
 # scan.
@@ -84,6 +89,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
    tuple-ids=0 row-size=42B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=4.00MB Threads=3
 Per-Host Resource Estimates: Memory=80MB
@@ -96,6 +102,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=42B cardinality=6001215
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=2
@@ -107,6 +114,7 @@ Per-Host Resources: mem-estimate=80.00MB mem-reservation=4.00MB thread-reservati
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
    tuple-ids=0 row-size=42B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=8.00MB Threads=3
 Per-Host Resource Estimates: Memory=160MB
@@ -119,6 +127,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=42B cardinality=6001215
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=160.00MB mem-reservation=8.00MB thread-reservation=2
@@ -130,6 +139,7 @@ Per-Host Resources: mem-estimate=160.00MB mem-reservation=8.00MB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=0
    tuple-ids=0 row-size=42B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ====
 # Small parquet files - memory reservation is reduced because of small file size.
 select string_col from functional_parquet.alltypes;
@@ -145,7 +155,7 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=188.29KB
+   partitions=24/24 files=24 size=188.92KB
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=unavailable
@@ -153,6 +163,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
    tuple-ids=0 row-size=16B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=16.00KB Threads=3
 Per-Host Resource Estimates: Memory=16MB
@@ -167,11 +178,12 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=16B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=2
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   partitions=24/24 files=24 size=188.29KB
+   partitions=24/24 files=24 size=188.92KB
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=unavailable
@@ -179,6 +191,7 @@ Per-Host Resources: mem-estimate=16.00MB mem-reservation=16.00KB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=1
    tuple-ids=0 row-size=16B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=32.00KB Threads=3
 Per-Host Resource Estimates: Memory=32MB
@@ -193,11 +206,12 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=16B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=32.00MB mem-reservation=32.00KB thread-reservation=2
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   partitions=24/24 files=24 size=188.29KB
+   partitions=24/24 files=24 size=188.92KB
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=unavailable
@@ -205,6 +219,7 @@ Per-Host Resources: mem-estimate=32.00MB mem-reservation=32.00KB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
    tuple-ids=0 row-size=16B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ====
 # Multi-column parquet scan with small files - memory reservation is reduced because of
 # small file size but a minimum amount is reserved per column.
@@ -221,7 +236,7 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=188.29KB
+   partitions=24/24 files=24 size=188.92KB
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=unavailable
@@ -229,6 +244,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=24.00KB thread-reservation=1
    tuple-ids=0 row-size=24B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=24.00KB Threads=3
 Per-Host Resource Estimates: Memory=16MB
@@ -243,11 +259,12 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=24B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=16.00MB mem-reservation=24.00KB thread-reservation=2
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   partitions=24/24 files=24 size=188.29KB
+   partitions=24/24 files=24 size=188.92KB
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=unavailable
@@ -255,6 +272,7 @@ Per-Host Resources: mem-estimate=16.00MB mem-reservation=24.00KB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=24.00KB thread-reservation=1
    tuple-ids=0 row-size=24B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=48.00KB Threads=3
 Per-Host Resource Estimates: Memory=32MB
@@ -269,11 +287,12 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=24B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=32.00MB mem-reservation=48.00KB thread-reservation=2
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   partitions=24/24 files=24 size=188.29KB
+   partitions=24/24 files=24 size=188.92KB
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=unavailable
@@ -281,6 +300,7 @@ Per-Host Resources: mem-estimate=32.00MB mem-reservation=48.00KB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=24.00KB thread-reservation=0
    tuple-ids=0 row-size=24B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ====
 # Parquet scan with no materialized columns. Need reservation to scan levels to determine
 # row count.
@@ -297,7 +317,7 @@ PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 00:SCAN HDFS [functional_parquet.alltypes]
-   partitions=24/24 files=24 size=188.29KB
+   partitions=24/24 files=24 size=188.92KB
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=unavailable
@@ -305,6 +325,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=1.00MB mem-reservation=16.00KB thread-reservation=1
    tuple-ids=0 row-size=0B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=16.00KB Threads=3
 Per-Host Resource Estimates: Memory=10MB
@@ -319,11 +340,12 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=0B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=1.00MB mem-reservation=16.00KB thread-reservation=2
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   partitions=24/24 files=24 size=188.29KB
+   partitions=24/24 files=24 size=188.92KB
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=unavailable
@@ -331,6 +353,7 @@ Per-Host Resources: mem-estimate=1.00MB mem-reservation=16.00KB thread-reservati
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=1.00MB mem-reservation=16.00KB thread-reservation=1
    tuple-ids=0 row-size=0B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=32.00KB Threads=3
 Per-Host Resource Estimates: Memory=32MB
@@ -345,11 +368,12 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=0B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=32.00MB mem-reservation=32.00KB thread-reservation=2
 00:SCAN HDFS [functional_parquet.alltypes, RANDOM]
-   partitions=24/24 files=24 size=188.29KB
+   partitions=24/24 files=24 size=188.92KB
    stored statistics:
      table: rows=unavailable size=unavailable
      partitions: 0/24 rows=unavailable
@@ -357,6 +381,7 @@ Per-Host Resources: mem-estimate=32.00MB mem-reservation=32.00KB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=16.00KB thread-reservation=0
    tuple-ids=0 row-size=0B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ====
 # Parquet scan - reservation estimate based on dictionary-encoded size reduces
 # reservation for column with low NDV.
@@ -378,6 +403,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=150000
    mem-estimate=24.00MB mem-reservation=128.00KB thread-reservation=1
    tuple-ids=0 row-size=2B cardinality=150000
+   in pipelines: 00(GETNEXT)
 ====
 # Parquet scan - reservation estimate based on uncompressed size reduces reservation
 # for fixed-width column with high NDV.
@@ -399,6 +425,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=150000
    mem-estimate=24.00MB mem-reservation=2.00MB thread-reservation=1
    tuple-ids=0 row-size=8B cardinality=150000
+   in pipelines: 00(GETNEXT)
 ====
 # Parquet scan - reservation estimate is not reduced based on column stats when they
 # are not present (c_mktsegment is a low NDV column so we can reduce our estimate of
@@ -423,6 +450,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=512.00KB thread-reservation=1
    tuple-ids=0 row-size=16B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ====
 # Parquet nested types, unnested in scan - should reserve memory for each column.
 select o_orderkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk
@@ -444,6 +472,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=44229
    mem-estimate=88.00MB mem-reservation=24.00MB thread-reservation=1
    tuple-ids=0 row-size=80B cardinality=1500000
+   in pipelines: 00(GETNEXT)
 ====
 # Parquet nested types, unnested in scan - don't reserve extra memory for "pos" virtual
 # column that piggy-backs on another column.
@@ -466,6 +495,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=44229
    mem-estimate=88.00MB mem-reservation=4.00MB thread-reservation=1
    tuple-ids=0 row-size=16B cardinality=1500000
+   in pipelines: 00(GETNEXT)
 ====
 # Parquet nested types, unnested in scan - reserve memory for "pos" virtual column if it
 # is the only column materialized.
@@ -488,6 +518,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=44229
    mem-estimate=88.00MB mem-reservation=4.00MB thread-reservation=1
    tuple-ids=0 row-size=8B cardinality=1500000
+   in pipelines: 00(GETNEXT)
 ====
 # Parquet nested types, left nested in scan - should reserve memory for each scalar
 # column in the nested collection plus the top-level column.
@@ -508,20 +539,24 @@ PLAN-ROOT SINK
 01:SUBPLAN
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1,0 row-size=104B cardinality=1500000
+|  in pipelines: 00(GETNEXT)
 |
 |--04:NESTED LOOP JOIN [CROSS JOIN]
 |  |  mem-estimate=24B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1,0 row-size=104B cardinality=10
+|  |  in pipelines: 00(GETNEXT)
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
 |  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |     tuple-ids=0 row-size=24B cardinality=1
+|  |     in pipelines: 00(GETNEXT)
 |  |
 |  03:UNNEST [c.c_orders]
 |     parent-subplan=01
 |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |     tuple-ids=1 row-size=0B cardinality=10
+|     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
    partitions=1/1 files=4 size=288.98MB
@@ -532,6 +567,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=44229
    mem-estimate=88.00MB mem-reservation=32.00MB thread-reservation=1
    tuple-ids=0 row-size=24B cardinality=150000
+   in pipelines: 00(GETNEXT)
 ====
 # Parquet nested types, left nested in scan - should reserve memory for each scalar
 # column (excluding pos) in the nested collection plus the top-level column.
@@ -551,20 +587,24 @@ PLAN-ROOT SINK
 01:SUBPLAN
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1,0 row-size=40B cardinality=1500000
+|  in pipelines: 00(GETNEXT)
 |
 |--04:NESTED LOOP JOIN [CROSS JOIN]
 |  |  mem-estimate=24B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1,0 row-size=40B cardinality=10
+|  |  in pipelines: 00(GETNEXT)
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
 |  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |     tuple-ids=0 row-size=24B cardinality=1
+|  |     in pipelines: 00(GETNEXT)
 |  |
 |  03:UNNEST [c.c_orders]
 |     parent-subplan=01
 |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |     tuple-ids=1 row-size=0B cardinality=10
+|     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
    partitions=1/1 files=4 size=288.98MB
@@ -575,6 +615,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=44229
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=24B cardinality=150000
+   in pipelines: 00(GETNEXT)
 ====
 # Parquet nested types, left nested in scan - should reserve memory for virtual
 # pos column in the nested collection because nothing else is materialized from
@@ -595,20 +636,24 @@ PLAN-ROOT SINK
 01:SUBPLAN
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1,0 row-size=32B cardinality=1500000
+|  in pipelines: 00(GETNEXT)
 |
 |--04:NESTED LOOP JOIN [CROSS JOIN]
 |  |  mem-estimate=24B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1,0 row-size=32B cardinality=10
+|  |  in pipelines: 00(GETNEXT)
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
 |  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |     tuple-ids=0 row-size=24B cardinality=1
+|  |     in pipelines: 00(GETNEXT)
 |  |
 |  03:UNNEST [c.c_orders]
 |     parent-subplan=01
 |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |     tuple-ids=1 row-size=0B cardinality=10
+|     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
    partitions=1/1 files=4 size=288.98MB
@@ -619,6 +664,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=44229
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=24B cardinality=150000
+   in pipelines: 00(GETNEXT)
 ====
 # Parquet nested types, left nested in scan - should reserve memory for nested column
 # plus top-level column even though nested fields are not materialized because the
@@ -639,20 +685,24 @@ PLAN-ROOT SINK
 01:SUBPLAN
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1,0 row-size=24B cardinality=1500000
+|  in pipelines: 00(GETNEXT)
 |
 |--04:NESTED LOOP JOIN [CROSS JOIN]
 |  |  mem-estimate=24B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1,0 row-size=24B cardinality=10
+|  |  in pipelines: 00(GETNEXT)
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
 |  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |     tuple-ids=0 row-size=24B cardinality=1
+|  |     in pipelines: 00(GETNEXT)
 |  |
 |  03:UNNEST [c.c_orders]
 |     parent-subplan=01
 |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |     tuple-ids=1 row-size=0B cardinality=10
+|     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
    partitions=1/1 files=4 size=288.98MB
@@ -663,6 +713,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=44229
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=24B cardinality=150000
+   in pipelines: 00(GETNEXT)
 ====
 # Parquet nested types, left nested in scan - should reserve memory for nested column
 # only when no top-level column is materialized.
@@ -682,20 +733,24 @@ PLAN-ROOT SINK
 01:SUBPLAN
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1,0 row-size=24B cardinality=1500000
+|  in pipelines: 00(GETNEXT)
 |
 |--04:NESTED LOOP JOIN [CROSS JOIN]
 |  |  mem-estimate=16B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1,0 row-size=24B cardinality=10
+|  |  in pipelines: 00(GETNEXT)
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
 |  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |     tuple-ids=0 row-size=16B cardinality=1
+|  |     in pipelines: 00(GETNEXT)
 |  |
 |  03:UNNEST [c.c_orders]
 |     parent-subplan=01
 |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |     tuple-ids=1 row-size=0B cardinality=10
+|     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
    partitions=1/1 files=4 size=288.98MB
@@ -706,6 +761,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=44229
    mem-estimate=88.00MB mem-reservation=4.00MB thread-reservation=1
    tuple-ids=0 row-size=16B cardinality=150000
+   in pipelines: 00(GETNEXT)
 ====
 # Parquet nested types with two levels of nesting materialized in scan. Should
 # reserve memory for columns at all three levels
@@ -725,38 +781,46 @@ PLAN-ROOT SINK
 01:SUBPLAN
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=2,1,0 row-size=64B cardinality=15000000
+|  in pipelines: 00(GETNEXT)
 |
 |--08:NESTED LOOP JOIN [CROSS JOIN]
 |  |  mem-estimate=24B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=2,1,0 row-size=64B cardinality=100
+|  |  in pipelines: 00(GETNEXT)
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |     parent-subplan=01
 |  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |     tuple-ids=0 row-size=24B cardinality=1
+|  |     in pipelines: 00(GETNEXT)
 |  |
 |  04:SUBPLAN
 |  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=2,1 row-size=40B cardinality=100
+|  |  in pipelines: 00(GETNEXT)
 |  |
 |  |--07:NESTED LOOP JOIN [CROSS JOIN]
 |  |  |  mem-estimate=24B mem-reservation=0B thread-reservation=0
 |  |  |  tuple-ids=2,1 row-size=40B cardinality=10
+|  |  |  in pipelines: 00(GETNEXT)
 |  |  |
 |  |  |--05:SINGULAR ROW SRC
 |  |  |     parent-subplan=04
 |  |  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |  |     tuple-ids=1 row-size=24B cardinality=1
+|  |  |     in pipelines: 00(GETNEXT)
 |  |  |
 |  |  06:UNNEST [o.o_lineitems]
 |  |     parent-subplan=04
 |  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |     tuple-ids=2 row-size=0B cardinality=10
+|  |     in pipelines: 00(GETNEXT)
 |  |
 |  03:UNNEST [c.c_orders o]
 |     parent-subplan=01
 |     mem-estimate=0B mem-reservation=0B thread-reservation=0
 |     tuple-ids=1 row-size=0B cardinality=10
+|     in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
    partitions=1/1 files=4 size=288.98MB
@@ -768,6 +832,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=44229
    mem-estimate=88.00MB mem-reservation=16.00MB thread-reservation=1
    tuple-ids=0 row-size=24B cardinality=150000
+   in pipelines: 00(GETNEXT)
 ====
 # Text scan
 select * from tpch.lineitem
@@ -788,6 +853,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=8.00MB Threads=3
 Per-Host Resource Estimates: Memory=88MB
@@ -800,6 +866,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=263B cardinality=6001215
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=2
@@ -811,6 +878,7 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservati
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=16.00MB Threads=3
 Per-Host Resource Estimates: Memory=176MB
@@ -823,6 +891,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=263B cardinality=6001215
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=176.00MB mem-reservation=16.00MB thread-reservation=2
@@ -834,6 +903,7 @@ Per-Host Resources: mem-estimate=176.00MB mem-reservation=16.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ====
 # Single column text scan - memory reservation is same as multi-column scan.
 select l_comment from tpch.lineitem
@@ -854,6 +924,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=42B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=8.00MB Threads=3
 Per-Host Resource Estimates: Memory=88MB
@@ -866,6 +937,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=42B cardinality=6001215
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=2
@@ -877,6 +949,7 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservati
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=42B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=16.00MB Threads=3
 Per-Host Resource Estimates: Memory=176MB
@@ -889,6 +962,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=42B cardinality=6001215
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=176.00MB mem-reservation=16.00MB thread-reservation=2
@@ -900,6 +974,7 @@ Per-Host Resources: mem-estimate=176.00MB mem-reservation=16.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
    tuple-ids=0 row-size=42B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ====
 # Text scan on small files - memory reservation is reduced.
 select * from functional.alltypes
@@ -922,6 +997,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=310
    mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=97B cardinality=7300
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=32.00KB Threads=3
 Per-Host Resource Estimates: Memory=16MB
@@ -935,6 +1011,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=97B cardinality=7300
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=2
@@ -947,6 +1024,7 @@ Per-Host Resources: mem-estimate=16.00MB mem-reservation=32.00KB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=310
    mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=97B cardinality=7300
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=64.00KB Threads=3
 Per-Host Resource Estimates: Memory=32MB
@@ -960,6 +1038,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=97B cardinality=7300
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=32.00MB mem-reservation=64.00KB thread-reservation=2
@@ -972,6 +1051,7 @@ Per-Host Resources: mem-estimate=32.00MB mem-reservation=64.00KB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=310
    mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=0
    tuple-ids=0 row-size=97B cardinality=7300
+   in pipelines: 00(GETNEXT)
 ====
 # Avro scan.
 select * from tpch_avro.orders
@@ -994,6 +1074,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=108B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=8.00MB Threads=3
 Per-Host Resource Estimates: Memory=88MB
@@ -1008,6 +1089,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=108B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
 Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=2
@@ -1019,6 +1101,7 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservati
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=108B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=16.00MB Threads=5
 Per-Host Resource Estimates: Memory=176MB
@@ -1033,6 +1116,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=108B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
 Per-Host Resources: mem-estimate=176.00MB mem-reservation=16.00MB thread-reservation=4
@@ -1044,6 +1128,7 @@ Per-Host Resources: mem-estimate=176.00MB mem-reservation=16.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=108B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ====
 # RC scan.
 select * from tpch_rc.customer
@@ -1066,6 +1151,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=32.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=98B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=8.00MB Threads=3
 Per-Host Resource Estimates: Memory=32MB
@@ -1080,6 +1166,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=98B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
 Per-Host Resources: mem-estimate=32.00MB mem-reservation=8.00MB thread-reservation=2
@@ -1091,6 +1178,7 @@ Per-Host Resources: mem-estimate=32.00MB mem-reservation=8.00MB thread-reservati
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=32.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=98B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=16.00MB Threads=5
 Per-Host Resource Estimates: Memory=64MB
@@ -1105,6 +1193,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=98B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
 Per-Host Resources: mem-estimate=64.00MB mem-reservation=16.00MB thread-reservation=4
@@ -1116,6 +1205,7 @@ Per-Host Resources: mem-estimate=64.00MB mem-reservation=16.00MB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=32.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=98B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ====
 # Seq scan.
 select * from tpcds_seq_snap.web_returns
@@ -1138,6 +1228,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=104B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=8.00MB Threads=3
 Per-Host Resource Estimates: Memory=16MB
@@ -1152,6 +1243,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=104B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
 Per-Host Resources: mem-estimate=16.00MB mem-reservation=8.00MB thread-reservation=2
@@ -1163,6 +1255,7 @@ Per-Host Resources: mem-estimate=16.00MB mem-reservation=8.00MB thread-reservati
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=104B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=16.00MB Threads=5
 Per-Host Resource Estimates: Memory=32MB
@@ -1177,6 +1270,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=104B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
 Per-Host Resources: mem-estimate=32.00MB mem-reservation=16.00MB thread-reservation=4
@@ -1188,6 +1282,7 @@ Per-Host Resources: mem-estimate=32.00MB mem-reservation=16.00MB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=104B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ====
 # ORC scan
 select * from tpch_orc_def.lineitem
@@ -1208,6 +1303,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=1075682
    mem-estimate=40.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ====
 # Single column ORC scan - memory reservation is same as multi-column scan.
 select l_comment from tpch_orc_def.lineitem
@@ -1228,6 +1324,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=1075682
    mem-estimate=40.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=42B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ====
 # ORC scan on small files - memory reservation is reduced.
 select * from functional_orc_def.alltypes
@@ -1251,6 +1348,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=1
    tuple-ids=0 row-size=88B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ====
 # Mixed table format scan
 select * from functional.alltypesmixedformat
@@ -1274,6 +1372,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=1
    tuple-ids=0 row-size=88B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=88.00KB Threads=3
 Per-Host Resource Estimates: Memory=16MB
@@ -1288,6 +1387,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=88B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=2
@@ -1300,6 +1400,7 @@ Per-Host Resources: mem-estimate=16.00MB mem-reservation=88.00KB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=1
    tuple-ids=0 row-size=88B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=176.00KB Threads=5
 Per-Host Resource Estimates: Memory=32MB
@@ -1314,6 +1415,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=88B cardinality=unavailable
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=32.00MB mem-reservation=176.00KB thread-reservation=4
@@ -1326,6 +1428,7 @@ Per-Host Resources: mem-estimate=32.00MB mem-reservation=176.00KB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=1
    tuple-ids=0 row-size=88B cardinality=unavailable
+   in pipelines: 00(GETNEXT)
 ====
 # HBase scan
 select * from functional_hbase.alltypes
@@ -1346,6 +1449,7 @@ PLAN-ROOT SINK
      columns: unavailable
    mem-estimate=1.00GB mem-reservation=0B thread-reservation=0
    tuple-ids=0 row-size=88B cardinality=14298
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=0B Threads=2
 Per-Host Resource Estimates: Memory=1.00GB
@@ -1360,6 +1464,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=88B cardinality=14298
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
 Per-Host Resources: mem-estimate=1.00GB mem-reservation=0B thread-reservation=1
@@ -1369,6 +1474,7 @@ Per-Host Resources: mem-estimate=1.00GB mem-reservation=0B thread-reservation=1
      columns: unavailable
    mem-estimate=1.00GB mem-reservation=0B thread-reservation=0
    tuple-ids=0 row-size=88B cardinality=14298
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=0B Threads=3
 Per-Host Resource Estimates: Memory=2.00GB
@@ -1383,6 +1489,7 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=88B cardinality=14298
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
 Per-Host Resources: mem-estimate=2.00GB mem-reservation=0B thread-reservation=2
@@ -1392,6 +1499,7 @@ Per-Host Resources: mem-estimate=2.00GB mem-reservation=0B thread-reservation=2
      columns: unavailable
    mem-estimate=1.00GB mem-reservation=0B thread-reservation=0
    tuple-ids=0 row-size=88B cardinality=14298
+   in pipelines: 00(GETNEXT)
 ====
 # Data source scan
 select * from functional.alltypes_datasource
@@ -1409,6 +1517,7 @@ PLAN-ROOT SINK
 00:SCAN DATA SOURCE [functional.alltypes_datasource]
    mem-estimate=1.00GB mem-reservation=0B thread-reservation=0
    tuple-ids=0 row-size=116B cardinality=5000
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=0B Threads=2
 Per-Host Resource Estimates: Memory=1.00GB
@@ -1423,12 +1532,14 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=116B cardinality=5000
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
 Per-Host Resources: mem-estimate=1.00GB mem-reservation=0B thread-reservation=1
 00:SCAN DATA SOURCE [functional.alltypes_datasource]
    mem-estimate=1.00GB mem-reservation=0B thread-reservation=0
    tuple-ids=0 row-size=116B cardinality=5000
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=0B Threads=3
 Per-Host Resource Estimates: Memory=2.00GB
@@ -1443,12 +1554,14 @@ PLAN-ROOT SINK
 01:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=116B cardinality=5000
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=2
 Per-Host Resources: mem-estimate=2.00GB mem-reservation=0B thread-reservation=2
 00:SCAN DATA SOURCE [functional.alltypes_datasource]
    mem-estimate=1.00GB mem-reservation=0B thread-reservation=0
    tuple-ids=0 row-size=116B cardinality=5000
+   in pipelines: 00(GETNEXT)
 ====
 # Union
 select * from tpch.lineitem
@@ -1467,6 +1580,7 @@ PLAN-ROOT SINK
 |  pass-through-operands: all
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=2 row-size=263B cardinality=12002430
+|  in pipelines: 01(GETNEXT), 02(GETNEXT)
 |
 |--02:SCAN HDFS [tpch.lineitem]
 |     partitions=1/1 files=1 size=718.94MB
@@ -1476,6 +1590,7 @@ PLAN-ROOT SINK
 |     extrapolated-rows=disabled max-scan-range-rows=1068457
 |     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
 |     tuple-ids=1 row-size=263B cardinality=6001215
+|     in pipelines: 02(GETNEXT)
 |
 01:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
@@ -1485,6 +1600,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 01(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=8.00MB Threads=3
 Per-Host Resource Estimates: Memory=88MB
@@ -1497,6 +1613,7 @@ PLAN-ROOT SINK
 03:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=2 row-size=263B cardinality=12002430
+|  in pipelines: 01(GETNEXT), 02(GETNEXT)
 |
 F02:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=2
@@ -1504,6 +1621,7 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservati
 |  pass-through-operands: all
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=2 row-size=263B cardinality=12002430
+|  in pipelines: 01(GETNEXT), 02(GETNEXT)
 |
 |--02:SCAN HDFS [tpch.lineitem, RANDOM]
 |     partitions=1/1 files=1 size=718.94MB
@@ -1513,6 +1631,7 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservati
 |     extrapolated-rows=disabled max-scan-range-rows=1068457
 |     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
 |     tuple-ids=1 row-size=263B cardinality=6001215
+|     in pipelines: 02(GETNEXT)
 |
 01:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
@@ -1522,6 +1641,7 @@ Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservati
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 01(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=16.00MB Threads=3
 Per-Host Resource Estimates: Memory=176MB
@@ -1534,6 +1654,7 @@ PLAN-ROOT SINK
 03:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=2 row-size=263B cardinality=12002430
+|  in pipelines: 01(GETNEXT), 02(GETNEXT)
 |
 F02:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=176.00MB mem-reservation=16.00MB thread-reservation=2
@@ -1541,6 +1662,7 @@ Per-Host Resources: mem-estimate=176.00MB mem-reservation=16.00MB thread-reserva
 |  pass-through-operands: all
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=2 row-size=263B cardinality=12002430
+|  in pipelines: 01(GETNEXT), 02(GETNEXT)
 |
 |--02:SCAN HDFS [tpch.lineitem, RANDOM]
 |     partitions=1/1 files=1 size=718.94MB
@@ -1550,6 +1672,7 @@ Per-Host Resources: mem-estimate=176.00MB mem-reservation=16.00MB thread-reserva
 |     extrapolated-rows=disabled max-scan-range-rows=1068457
 |     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
 |     tuple-ids=1 row-size=263B cardinality=6001215
+|     in pipelines: 02(GETNEXT)
 |
 01:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
@@ -1559,6 +1682,7 @@ Per-Host Resources: mem-estimate=176.00MB mem-reservation=16.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 01(GETNEXT)
 ====
 # Grouping aggregation
 select l_orderkey, count(*)
@@ -1578,6 +1702,7 @@ PLAN-ROOT SINK
 |  group by: l_orderkey
 |  mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=16B cardinality=1563438
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
    partitions=1/1 files=3 size=193.71MB
@@ -1587,6 +1712,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
    tuple-ids=0 row-size=8B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=46.50MB Threads=4
 Per-Host Resource Estimates: Memory=124MB
@@ -1599,6 +1725,7 @@ PLAN-ROOT SINK
 04:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=16B cardinality=1563438
+|  in pipelines: 03(GETNEXT)
 |
 F01:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
 Per-Host Resources: mem-estimate=10.00MB mem-reservation=8.50MB thread-reservation=1
@@ -1607,10 +1734,12 @@ Per-Host Resources: mem-estimate=10.00MB mem-reservation=8.50MB thread-reservati
 |  group by: l_orderkey
 |  mem-estimate=10.00MB mem-reservation=8.50MB spill-buffer=512.00KB thread-reservation=0
 |  tuple-ids=1 row-size=16B cardinality=1563438
+|  in pipelines: 03(GETNEXT), 00(OPEN)
 |
 02:EXCHANGE [HASH(l_orderkey)]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=16B cardinality=1563438
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=114.00MB mem-reservation=38.00MB thread-reservation=2
@@ -1619,6 +1748,7 @@ Per-Host Resources: mem-estimate=114.00MB mem-reservation=38.00MB thread-reserva
 |  group by: l_orderkey
 |  mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=16B cardinality=1563438
+|  in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.71MB
@@ -1628,6 +1758,7 @@ Per-Host Resources: mem-estimate=114.00MB mem-reservation=38.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=1
    tuple-ids=0 row-size=8B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=85.50MB Threads=5
 Per-Host Resource Estimates: Memory=248MB
@@ -1640,6 +1771,7 @@ PLAN-ROOT SINK
 04:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=16B cardinality=1563438
+|  in pipelines: 03(GETNEXT)
 |
 F01:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
 Per-Host Resources: mem-estimate=20.00MB mem-reservation=9.50MB thread-reservation=2
@@ -1648,10 +1780,12 @@ Per-Host Resources: mem-estimate=20.00MB mem-reservation=9.50MB thread-reservati
 |  group by: l_orderkey
 |  mem-estimate=10.00MB mem-reservation=4.75MB spill-buffer=256.00KB thread-reservation=0
 |  tuple-ids=1 row-size=16B cardinality=1563438
+|  in pipelines: 03(GETNEXT), 00(OPEN)
 |
 02:EXCHANGE [HASH(l_orderkey)]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=16B cardinality=1563438
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=228.00MB mem-reservation=76.00MB thread-reservation=2
@@ -1660,6 +1794,7 @@ Per-Host Resources: mem-estimate=228.00MB mem-reservation=76.00MB thread-reserva
 |  group by: l_orderkey
 |  mem-estimate=34.00MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=16B cardinality=1563438
+|  in pipelines: 00(GETNEXT)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.71MB
@@ -1669,6 +1804,7 @@ Per-Host Resources: mem-estimate=228.00MB mem-reservation=76.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=4.00MB thread-reservation=0
    tuple-ids=0 row-size=8B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ====
 # Non-grouping aggregation with zero-slot parquet scan
 select count(*) from tpch_parquet.lineitem
@@ -1685,6 +1821,7 @@ PLAN-ROOT SINK
 |  output: sum_init_zero(tpch_parquet.lineitem.parquet-stats: num_rows)
 |  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=8B cardinality=1
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
    partitions=1/1 files=3 size=193.71MB
@@ -1694,6 +1831,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1
    tuple-ids=0 row-size=8B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=128.00KB Threads=3
 Per-Host Resource Estimates: Memory=21MB
@@ -1707,10 +1845,12 @@ PLAN-ROOT SINK
 |  output: count:merge(*)
 |  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 01(OPEN)
 |
 02:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=8B cardinality=1
+|  in pipelines: 01(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=11.00MB mem-reservation=128.00KB thread-reservation=2
@@ -1718,6 +1858,7 @@ Per-Host Resources: mem-estimate=11.00MB mem-reservation=128.00KB thread-reserva
 |  output: sum_init_zero(tpch_parquet.lineitem.parquet-stats: num_rows)
 |  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=8B cardinality=1
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.71MB
@@ -1727,6 +1868,7 @@ Per-Host Resources: mem-estimate=11.00MB mem-reservation=128.00KB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1
    tuple-ids=0 row-size=8B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=256.00KB Threads=3
 Per-Host Resource Estimates: Memory=190MB
@@ -1740,10 +1882,12 @@ PLAN-ROOT SINK
 |  output: count:merge(*)
 |  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 01(OPEN)
 |
 02:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=8B cardinality=1
+|  in pipelines: 01(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=180.00MB mem-reservation=256.00KB thread-reservation=2
@@ -1751,6 +1895,7 @@ Per-Host Resources: mem-estimate=180.00MB mem-reservation=256.00KB thread-reserv
 |  output: sum_init_zero(tpch_parquet.lineitem.parquet-stats: num_rows)
 |  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=8B cardinality=1
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.71MB
@@ -1760,6 +1905,7 @@ Per-Host Resources: mem-estimate=180.00MB mem-reservation=256.00KB thread-reserv
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=128.00KB thread-reservation=0
    tuple-ids=0 row-size=8B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ====
 # Sort
 select *
@@ -1778,6 +1924,7 @@ PLAN-ROOT SINK
 |  order by: l_comment ASC
 |  mem-estimate=40.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=263B cardinality=6001215
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
    partitions=1/1 files=3 size=193.71MB
@@ -1787,6 +1934,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=52.00MB Threads=3
 Per-Host Resource Estimates: Memory=120MB
@@ -1800,6 +1948,7 @@ PLAN-ROOT SINK
 |  order by: l_comment ASC
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=263B cardinality=6001215
+|  in pipelines: 01(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=120.00MB mem-reservation=52.00MB thread-reservation=2
@@ -1807,6 +1956,7 @@ Per-Host Resources: mem-estimate=120.00MB mem-reservation=52.00MB thread-reserva
 |  order by: l_comment ASC
 |  mem-estimate=40.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=263B cardinality=6001215
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.71MB
@@ -1816,6 +1966,7 @@ Per-Host Resources: mem-estimate=120.00MB mem-reservation=52.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=104.00MB Threads=3
 Per-Host Resource Estimates: Memory=240MB
@@ -1829,6 +1980,7 @@ PLAN-ROOT SINK
 |  order by: l_comment ASC
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=263B cardinality=6001215
+|  in pipelines: 01(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=240.00MB mem-reservation=104.00MB thread-reservation=2
@@ -1836,6 +1988,7 @@ Per-Host Resources: mem-estimate=240.00MB mem-reservation=104.00MB thread-reserv
 |  order by: l_comment ASC
 |  mem-estimate=40.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=1 row-size=263B cardinality=6001215
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.71MB
@@ -1845,6 +1998,7 @@ Per-Host Resources: mem-estimate=240.00MB mem-reservation=104.00MB thread-reserv
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=0
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ====
 # TOP-N
 select *
@@ -1864,6 +2018,7 @@ PLAN-ROOT SINK
 |  order by: l_comment ASC
 |  mem-estimate=25.66KB mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=263B cardinality=100
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [tpch_parquet.lineitem]
    partitions=1/1 files=3 size=193.71MB
@@ -1873,6 +2028,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=40.00MB Threads=3
 Per-Host Resource Estimates: Memory=80MB
@@ -1887,6 +2043,7 @@ PLAN-ROOT SINK
 |  limit: 100
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=263B cardinality=100
+|  in pipelines: 01(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=80.03MB mem-reservation=40.00MB thread-reservation=2
@@ -1894,6 +2051,7 @@ Per-Host Resources: mem-estimate=80.03MB mem-reservation=40.00MB thread-reservat
 |  order by: l_comment ASC
 |  mem-estimate=25.66KB mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=263B cardinality=100
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.71MB
@@ -1903,6 +2061,7 @@ Per-Host Resources: mem-estimate=80.03MB mem-reservation=40.00MB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=80.00MB Threads=3
 Per-Host Resource Estimates: Memory=160MB
@@ -1917,6 +2076,7 @@ PLAN-ROOT SINK
 |  limit: 100
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=263B cardinality=100
+|  in pipelines: 01(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=160.05MB mem-reservation=80.00MB thread-reservation=2
@@ -1924,6 +2084,7 @@ Per-Host Resources: mem-estimate=160.05MB mem-reservation=80.00MB thread-reserva
 |  order by: l_comment ASC
 |  mem-estimate=25.66KB mem-reservation=0B thread-reservation=0
 |  tuple-ids=1 row-size=263B cardinality=100
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [tpch_parquet.lineitem, RANDOM]
    partitions=1/1 files=3 size=193.71MB
@@ -1933,6 +2094,7 @@ Per-Host Resources: mem-estimate=160.05MB mem-reservation=80.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=2141802
    mem-estimate=80.00MB mem-reservation=40.00MB thread-reservation=0
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ====
 # Broadcast Hash Join
 select *
@@ -1952,6 +2114,7 @@ PLAN-ROOT SINK
 |  runtime filters: RF000[bloom] <- o_orderkey
 |  mem-estimate=300.41MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
+|  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--01:SCAN HDFS [tpch.orders]
 |     partitions=1/1 files=1 size=162.56MB
@@ -1961,6 +2124,7 @@ PLAN-ROOT SINK
 |     extrapolated-rows=disabled max-scan-range-rows=1181132
 |     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
 |     tuple-ids=1 row-size=191B cardinality=1500000
+|     in pipelines: 01(GETNEXT)
 |
 00:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
@@ -1971,6 +2135,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=51.00MB Threads=5
 Per-Host Resource Estimates: Memory=477MB
@@ -1983,6 +2148,7 @@ PLAN-ROOT SINK
 04:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=389.41MB mem-reservation=43.00MB thread-reservation=2 runtime-filters-memory=1.00MB
@@ -1992,10 +2158,12 @@ Per-Host Resources: mem-estimate=389.41MB mem-reservation=43.00MB thread-reserva
 |  runtime filters: RF000[bloom] <- o_orderkey
 |  mem-estimate=300.41MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
+|  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--03:EXCHANGE [BROADCAST]
 |  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1 row-size=191B cardinality=1500000
+|  |  in pipelines: 01(GETNEXT)
 |  |
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
 |  Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=2
@@ -2007,6 +2175,7 @@ Per-Host Resources: mem-estimate=389.41MB mem-reservation=43.00MB thread-reserva
 |     extrapolated-rows=disabled max-scan-range-rows=1181132
 |     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
 |     tuple-ids=1 row-size=191B cardinality=1500000
+|     in pipelines: 01(GETNEXT)
 |
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
@@ -2017,6 +2186,7 @@ Per-Host Resources: mem-estimate=389.41MB mem-reservation=43.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=102.00MB Threads=5
 Per-Host Resource Estimates: Memory=955MB
@@ -2029,6 +2199,7 @@ PLAN-ROOT SINK
 04:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=778.83MB mem-reservation=86.00MB thread-reservation=2 runtime-filters-memory=1.00MB
@@ -2039,6 +2210,7 @@ Per-Host Resources: mem-estimate=778.83MB mem-reservation=86.00MB thread-reserva
 |  runtime filters: RF000[bloom] <- o_orderkey
 |  mem-estimate=300.41MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
+|  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--F03:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
 |  |  Per-Host Resources: included in parent fragment
@@ -2050,6 +2222,7 @@ Per-Host Resources: mem-estimate=778.83MB mem-reservation=86.00MB thread-reserva
 |  03:EXCHANGE [BROADCAST]
 |  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1 row-size=191B cardinality=1500000
+|  |  in pipelines: 01(GETNEXT)
 |  |
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
 |  Per-Host Resources: mem-estimate=176.00MB mem-reservation=16.00MB thread-reservation=2
@@ -2061,6 +2234,7 @@ Per-Host Resources: mem-estimate=778.83MB mem-reservation=86.00MB thread-reserva
 |     extrapolated-rows=disabled max-scan-range-rows=1181132
 |     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
 |     tuple-ids=1 row-size=191B cardinality=1500000
+|     in pipelines: 01(GETNEXT)
 |
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
@@ -2071,6 +2245,7 @@ Per-Host Resources: mem-estimate=778.83MB mem-reservation=86.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ====
 # Shuffle Hash Join
 select *
@@ -2090,6 +2265,7 @@ PLAN-ROOT SINK
 |  runtime filters: RF000[bloom] <- o_orderkey
 |  mem-estimate=300.41MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
+|  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--01:SCAN HDFS [tpch.orders]
 |     partitions=1/1 files=1 size=162.56MB
@@ -2099,6 +2275,7 @@ PLAN-ROOT SINK
 |     extrapolated-rows=disabled max-scan-range-rows=1181132
 |     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
 |     tuple-ids=1 row-size=191B cardinality=1500000
+|     in pipelines: 01(GETNEXT)
 |
 00:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
@@ -2109,6 +2286,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=52.00MB Threads=6
 Per-Host Resource Estimates: Memory=278MB
@@ -2121,6 +2299,7 @@ PLAN-ROOT SINK
 05:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
+|  in pipelines: 00(GETNEXT)
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=3
 Per-Host Resources: mem-estimate=101.14MB mem-reservation=35.00MB thread-reservation=1 runtime-filters-memory=1.00MB
@@ -2130,10 +2309,12 @@ Per-Host Resources: mem-estimate=101.14MB mem-reservation=35.00MB thread-reserva
 |  runtime filters: RF000[bloom] <- o_orderkey
 |  mem-estimate=100.14MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
+|  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--04:EXCHANGE [HASH(o_orderkey)]
 |  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1 row-size=191B cardinality=1500000
+|  |  in pipelines: 01(GETNEXT)
 |  |
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
 |  Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=2
@@ -2145,10 +2326,12 @@ Per-Host Resources: mem-estimate=101.14MB mem-reservation=35.00MB thread-reserva
 |     extrapolated-rows=disabled max-scan-range-rows=1181132
 |     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
 |     tuple-ids=1 row-size=191B cardinality=1500000
+|     in pipelines: 01(GETNEXT)
 |
 03:EXCHANGE [HASH(l_orderkey)]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=263B cardinality=6001215
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=89.00MB mem-reservation=9.00MB thread-reservation=2 runtime-filters-memory=1.00MB
@@ -2161,6 +2344,7 @@ Per-Host Resources: mem-estimate=89.00MB mem-reservation=9.00MB thread-reservati
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=104.00MB Threads=7
 Per-Host Resource Estimates: Memory=456MB
@@ -2173,6 +2357,7 @@ PLAN-ROOT SINK
 05:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
+|  in pipelines: 00(GETNEXT)
 |
 F02:PLAN FRAGMENT [HASH(l_orderkey)] hosts=3 instances=6
 Per-Host Resources: mem-estimate=102.14MB mem-reservation=70.00MB thread-reservation=2 runtime-filters-memory=1.00MB
@@ -2183,6 +2368,7 @@ Per-Host Resources: mem-estimate=102.14MB mem-reservation=70.00MB thread-reserva
 |  runtime filters: RF000[bloom] <- o_orderkey
 |  mem-estimate=50.07MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=5757710
+|  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--F04:PLAN FRAGMENT [HASH(l_orderkey)] hosts=2 instances=4
 |  |  Per-Host Resources: included in parent fragment
@@ -2194,6 +2380,7 @@ Per-Host Resources: mem-estimate=102.14MB mem-reservation=70.00MB thread-reserva
 |  04:EXCHANGE [HASH(o_orderkey)]
 |  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1 row-size=191B cardinality=1500000
+|  |  in pipelines: 01(GETNEXT)
 |  |
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
 |  Per-Host Resources: mem-estimate=176.00MB mem-reservation=16.00MB thread-reservation=2
@@ -2205,10 +2392,12 @@ Per-Host Resources: mem-estimate=102.14MB mem-reservation=70.00MB thread-reserva
 |     extrapolated-rows=disabled max-scan-range-rows=1181132
 |     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
 |     tuple-ids=1 row-size=191B cardinality=1500000
+|     in pipelines: 01(GETNEXT)
 |
 03:EXCHANGE [HASH(l_orderkey)]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=263B cardinality=6001215
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=178.00MB mem-reservation=18.00MB thread-reservation=2 runtime-filters-memory=1.00MB
@@ -2221,6 +2410,7 @@ Per-Host Resources: mem-estimate=178.00MB mem-reservation=18.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ====
 # Nested loop join
 select *
@@ -2237,6 +2427,7 @@ PLAN-ROOT SINK
 02:NESTED LOOP JOIN [CROSS JOIN]
 |  mem-estimate=273.10MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=9001822500000
+|  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--01:SCAN HDFS [tpch.orders]
 |     partitions=1/1 files=1 size=162.56MB
@@ -2246,6 +2437,7 @@ PLAN-ROOT SINK
 |     extrapolated-rows=disabled max-scan-range-rows=1181132
 |     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
 |     tuple-ids=1 row-size=191B cardinality=1500000
+|     in pipelines: 01(GETNEXT)
 |
 00:SCAN HDFS [tpch.lineitem]
    partitions=1/1 files=1 size=718.94MB
@@ -2255,6 +2447,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=16.00MB Threads=5
 Per-Host Resource Estimates: Memory=449MB
@@ -2267,16 +2460,19 @@ PLAN-ROOT SINK
 04:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=9001822500000
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=361.10MB mem-reservation=8.00MB thread-reservation=2
 02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST]
 |  mem-estimate=273.10MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=9001822500000
+|  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--03:EXCHANGE [BROADCAST]
 |  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1 row-size=191B cardinality=1500000
+|  |  in pipelines: 01(GETNEXT)
 |  |
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
 |  Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=2
@@ -2288,6 +2484,7 @@ Per-Host Resources: mem-estimate=361.10MB mem-reservation=8.00MB thread-reservat
 |     extrapolated-rows=disabled max-scan-range-rows=1181132
 |     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
 |     tuple-ids=1 row-size=191B cardinality=1500000
+|     in pipelines: 01(GETNEXT)
 |
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
@@ -2297,6 +2494,7 @@ Per-Host Resources: mem-estimate=361.10MB mem-reservation=8.00MB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=32.00MB Threads=5
 Per-Host Resource Estimates: Memory=898MB
@@ -2309,6 +2507,7 @@ PLAN-ROOT SINK
 04:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=9001822500000
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=722.21MB mem-reservation=16.00MB thread-reservation=2
@@ -2316,6 +2515,7 @@ Per-Host Resources: mem-estimate=722.21MB mem-reservation=16.00MB thread-reserva
 |  join table id: 00
 |  mem-estimate=273.10MB mem-reservation=0B thread-reservation=0
 |  tuple-ids=0,1 row-size=454B cardinality=9001822500000
+|  in pipelines: 00(GETNEXT), 01(OPEN)
 |
 |--F03:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
 |  |  Per-Host Resources: included in parent fragment
@@ -2326,6 +2526,7 @@ Per-Host Resources: mem-estimate=722.21MB mem-reservation=16.00MB thread-reserva
 |  03:EXCHANGE [BROADCAST]
 |  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  |  tuple-ids=1 row-size=191B cardinality=1500000
+|  |  in pipelines: 01(GETNEXT)
 |  |
 |  F01:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
 |  Per-Host Resources: mem-estimate=176.00MB mem-reservation=16.00MB thread-reservation=2
@@ -2337,6 +2538,7 @@ Per-Host Resources: mem-estimate=722.21MB mem-reservation=16.00MB thread-reserva
 |     extrapolated-rows=disabled max-scan-range-rows=1181132
 |     mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
 |     tuple-ids=1 row-size=191B cardinality=1500000
+|     in pipelines: 01(GETNEXT)
 |
 00:SCAN HDFS [tpch.lineitem, RANDOM]
    partitions=1/1 files=1 size=718.94MB
@@ -2346,6 +2548,7 @@ Per-Host Resources: mem-estimate=722.21MB mem-reservation=16.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=1068457
    mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
    tuple-ids=0 row-size=263B cardinality=6001215
+   in pipelines: 00(GETNEXT)
 ====
 # Empty set node
 select * from functional.alltypes where 1 = 2
@@ -2362,6 +2565,7 @@ PLAN-ROOT SINK
 00:EMPTYSET
    mem-estimate=0B mem-reservation=0B thread-reservation=0
    tuple-ids=0 row-size=0B cardinality=0
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=0B Threads=1
 Per-Host Resource Estimates: Memory=10MB
@@ -2375,6 +2579,7 @@ PLAN-ROOT SINK
 00:EMPTYSET
    mem-estimate=0B mem-reservation=0B thread-reservation=0
    tuple-ids=0 row-size=0B cardinality=0
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=0B Threads=1
 Per-Host Resource Estimates: Memory=10MB
@@ -2388,6 +2593,7 @@ PLAN-ROOT SINK
 00:EMPTYSET
    mem-estimate=0B mem-reservation=0B thread-reservation=0
    tuple-ids=0 row-size=0B cardinality=0
+   in pipelines: 00(GETNEXT)
 ====
 # Analytic function
 select max(tinyint_col) over(partition by int_col)
@@ -2407,11 +2613,13 @@ PLAN-ROOT SINK
 |  partition by: int_col
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=3,2 row-size=6B cardinality=7300
+|  in pipelines: 01(GETNEXT)
 |
 01:SORT
 |  order by: int_col ASC NULLS FIRST
 |  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=3 row-size=5B cardinality=7300
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
@@ -2422,6 +2630,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=310
    mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=5B cardinality=7300
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=10.03MB Threads=4
 Per-Host Resource Estimates: Memory=26MB
@@ -2435,6 +2644,7 @@ PLAN-ROOT SINK
 04:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=3,2 row-size=6B cardinality=7300
+|  in pipelines: 01(GETNEXT)
 |
 F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=3
 Per-Host Resources: mem-estimate=10.00MB mem-reservation=10.00MB thread-reservation=1
@@ -2443,15 +2653,18 @@ Per-Host Resources: mem-estimate=10.00MB mem-reservation=10.00MB thread-reservat
 |  partition by: int_col
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=3,2 row-size=6B cardinality=7300
+|  in pipelines: 01(GETNEXT)
 |
 01:SORT
 |  order by: int_col ASC NULLS FIRST
 |  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=3 row-size=5B cardinality=7300
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 03:EXCHANGE [HASH(int_col)]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=5B cardinality=7300
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
 Per-Host Resources: mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=2
@@ -2464,6 +2677,7 @@ Per-Host Resources: mem-estimate=16.00MB mem-reservation=32.00KB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=310
    mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=5B cardinality=7300
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=20.06MB Threads=5
 Per-Host Resource Estimates: Memory=52MB
@@ -2477,6 +2691,7 @@ PLAN-ROOT SINK
 04:EXCHANGE [UNPARTITIONED]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=3,2 row-size=6B cardinality=7300
+|  in pipelines: 01(GETNEXT)
 |
 F01:PLAN FRAGMENT [HASH(int_col)] hosts=3 instances=6
 Per-Host Resources: mem-estimate=20.00MB mem-reservation=20.00MB thread-reservation=2
@@ -2485,15 +2700,18 @@ Per-Host Resources: mem-estimate=20.00MB mem-reservation=20.00MB thread-reservat
 |  partition by: int_col
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=3,2 row-size=6B cardinality=7300
+|  in pipelines: 01(GETNEXT)
 |
 01:SORT
 |  order by: int_col ASC NULLS FIRST
 |  mem-estimate=6.00MB mem-reservation=6.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=3 row-size=5B cardinality=7300
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 03:EXCHANGE [HASH(int_col)]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=0 row-size=5B cardinality=7300
+|  in pipelines: 00(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
 Per-Host Resources: mem-estimate=32.00MB mem-reservation=64.00KB thread-reservation=2
@@ -2506,6 +2724,7 @@ Per-Host Resources: mem-estimate=32.00MB mem-reservation=64.00KB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=310
    mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=0
    tuple-ids=0 row-size=5B cardinality=7300
+   in pipelines: 00(GETNEXT)
 ====
 # Pipeline of blocking operators from analytic fns. Blocking operators break
 # the pipeline so they do not all consume resources concurrently.
@@ -2528,11 +2747,13 @@ PLAN-ROOT SINK
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=10,5 row-size=215B cardinality=1500000
+|  in pipelines: 05(GETNEXT)
 |
 05:SORT
 |  order by: o_orderpriority ASC
 |  mem-estimate=18.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=10 row-size=207B cardinality=1500000
+|  in pipelines: 05(GETNEXT), 03(OPEN)
 |
 04:ANALYTIC
 |  functions: row_number()
@@ -2540,11 +2761,13 @@ PLAN-ROOT SINK
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=8,4 row-size=207B cardinality=1500000
+|  in pipelines: 03(GETNEXT)
 |
 03:SORT
 |  order by: o_orderdate ASC
 |  mem-estimate=18.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=8 row-size=199B cardinality=1500000
+|  in pipelines: 03(GETNEXT), 01(OPEN)
 |
 02:ANALYTIC
 |  functions: row_number()
@@ -2552,11 +2775,13 @@ PLAN-ROOT SINK
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=6,3 row-size=199B cardinality=1500000
+|  in pipelines: 01(GETNEXT)
 |
 01:SORT
 |  order by: o_totalprice ASC
 |  mem-estimate=18.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=6 row-size=191B cardinality=1500000
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [tpch_parquet.orders]
    partitions=1/1 files=2 size=54.07MB
@@ -2566,6 +2791,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=1177135
    mem-estimate=40.00MB mem-reservation=24.00MB thread-reservation=1
    tuple-ids=0 row-size=191B cardinality=1500000
+   in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=68.00MB Threads=3
 Per-Host Resource Estimates: Memory=102MB
@@ -2581,11 +2807,13 @@ PLAN-ROOT SINK
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=10,5 row-size=215B cardinality=1500000
+|  in pipelines: 05(GETNEXT)
 |
 05:SORT
 |  order by: o_orderpriority ASC
 |  mem-estimate=18.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=10 row-size=207B cardinality=1500000
+|  in pipelines: 05(GETNEXT), 03(OPEN)
 |
 04:ANALYTIC
 |  functions: row_number()
@@ -2593,11 +2821,13 @@ PLAN-ROOT SINK
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=8,4 row-size=207B cardinality=1500000
+|  in pipelines: 03(GETNEXT)
 |
 03:SORT
 |  order by: o_orderdate ASC
 |  mem-estimate=18.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=8 row-size=199B cardinality=1500000
+|  in pipelines: 03(GETNEXT), 01(OPEN)
 |
 02:ANALYTIC
 |  functions: row_number()
@@ -2605,11 +2835,13 @@ PLAN-ROOT SINK
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=6,3 row-size=199B cardinality=1500000
+|  in pipelines: 01(GETNEXT)
 |
 07:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: o_totalprice ASC
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=6 row-size=191B cardinality=1500000
+|  in pipelines: 01(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
 Per-Host Resources: mem-estimate=58.00MB mem-reservation=36.00MB thread-reservation=2
@@ -2617,6 +2849,7 @@ Per-Host Resources: mem-estimate=58.00MB mem-reservation=36.00MB thread-reservat
 |  order by: o_totalprice ASC
 |  mem-estimate=18.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=6 row-size=191B cardinality=1500000
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [tpch_parquet.orders, RANDOM]
    partitions=1/1 files=2 size=54.07MB
@@ -2626,6 +2859,7 @@ Per-Host Resources: mem-estimate=58.00MB mem-reservation=36.00MB thread-reservat
    extrapolated-rows=disabled max-scan-range-rows=1177135
    mem-estimate=40.00MB mem-reservation=24.00MB thread-reservation=1
    tuple-ids=0 row-size=191B cardinality=1500000
+   in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=104.00MB Threads=3
 Per-Host Resource Estimates: Memory=160MB
@@ -2641,11 +2875,13 @@ PLAN-ROOT SINK
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=10,5 row-size=215B cardinality=1500000
+|  in pipelines: 05(GETNEXT)
 |
 05:SORT
 |  order by: o_orderpriority ASC
 |  mem-estimate=18.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=10 row-size=207B cardinality=1500000
+|  in pipelines: 05(GETNEXT), 03(OPEN)
 |
 04:ANALYTIC
 |  functions: row_number()
@@ -2653,11 +2889,13 @@ PLAN-ROOT SINK
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=8,4 row-size=207B cardinality=1500000
+|  in pipelines: 03(GETNEXT)
 |
 03:SORT
 |  order by: o_orderdate ASC
 |  mem-estimate=18.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=8 row-size=199B cardinality=1500000
+|  in pipelines: 03(GETNEXT), 01(OPEN)
 |
 02:ANALYTIC
 |  functions: row_number()
@@ -2665,11 +2903,13 @@ PLAN-ROOT SINK
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
 |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=6,3 row-size=199B cardinality=1500000
+|  in pipelines: 01(GETNEXT)
 |
 07:MERGING-EXCHANGE [UNPARTITIONED]
 |  order by: o_totalprice ASC
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=6 row-size=191B cardinality=1500000
+|  in pipelines: 01(GETNEXT)
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=4
 Per-Host Resources: mem-estimate=116.00MB mem-reservation=72.00MB thread-reservation=2
@@ -2677,6 +2917,7 @@ Per-Host Resources: mem-estimate=116.00MB mem-reservation=72.00MB thread-reserva
 |  order by: o_totalprice ASC
 |  mem-estimate=18.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=6 row-size=191B cardinality=1500000
+|  in pipelines: 01(GETNEXT), 00(OPEN)
 |
 00:SCAN HDFS [tpch_parquet.orders, RANDOM]
    partitions=1/1 files=2 size=54.07MB
@@ -2686,6 +2927,7 @@ Per-Host Resources: mem-estimate=116.00MB mem-reservation=72.00MB thread-reserva
    extrapolated-rows=disabled max-scan-range-rows=1177135
    mem-estimate=40.00MB mem-reservation=24.00MB thread-reservation=0
    tuple-ids=0 row-size=191B cardinality=1500000
+   in pipelines: 00(GETNEXT)
 ====
 # Union with non-trivial branches: each branch executes sequentially within fragment.
 select distinct l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment
@@ -2712,6 +2954,7 @@ PLAN-ROOT SINK
 |  pass-through-operands: 04
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |  tuple-ids=7 row-size=70B cardinality=2549844
+|  in pipelines: 04(GETNEXT), 05(GETNEXT), 08(GETNEXT)
 |
 |--10:HASH JOIN [INNER JOIN]
 |  |  hash predicates: l_orderkey = o_orderkey
@@ -2719,6 +2962,7 @@ PLAN-ROOT SINK
 |  |  runtime filters: RF004[bloom] <- o_orderkey
 |  |  mem-estimate=17.00MB mem-reservation=17.00MB spill-buffer=1.00MB thread-reservation=0
 |  |  tuple-ids=5,6 row-size=99B cardinality=822530
+|  |  in pipelines: 08(GETNEXT), 09(OPEN)
 |  |
 |  |--09:SCAN HDFS [tpch_parquet.orders]
 |  |     partitions=1/1 files=2 size=54.07MB
@@ -2728,6 +2972,7 @@ PLAN-ROOT SINK
 |  |     extrapolated-rows=disabled max-scan-range-rows=1177135
 |  |     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
 |  |     tuple-ids=6 row-size=8B cardinality=1500000
+|  |     in pipelines: 09(GETNEXT)
 |  |
 |  08:SCAN HDFS [tpch_parquet.lineitem]
 |     partitions=1/1 files=3 size=193.71MB
@@ -2741,6 +2986,7 @@ PLAN-ROOT SINK
 |     parquet dictionary predicates: l_shipmode = 'F'
 |     mem-estimate=80.00MB mem-reservation=24.00MB thread-reservation=1
 |     tuple-ids=5 row-size=91B cardinality=857316
+|     in pipelines: 08(GETNEXT)
 |
 |--07:HASH JOIN [INNER JOIN]
 |  |  hash predicates: l_orderkey = o_orderkey
@@ -2748,6 +2994,7 @@ PLAN-ROOT SINK
 |  |  runtime filters: RF002[bloom] <- o_orderkey
 |  |  mem-estimate=17.00MB mem-reservation=17.00MB spill-buffer=1.00MB thread-reservation=0
 |  |  tuple-ids=3,4 row-size=103B cardinality=1151542
+|  |  in pipelines: 05(GETNEXT), 06(OPEN)
 |  |
 |  |--06:SCAN HDFS [tpch_parquet.orders]
 |  |     partitions=1/1 files=2 size=54.07MB
@@ -2760,6 +3007,7 @@ PLAN-ROOT SINK
 |  |     parquet dictionary predicates: o_orderpriority = '2-HIGH'
 |  |     mem-estimate=40.00MB mem-reservation=8.00MB thread-reservation=1
 |  |     tuple-ids=4 row-size=32B cardinality=300000
+|  |     in pipelines: 06(GETNEXT)
 |  |
 |  05:SCAN HDFS [tpch_parquet.lineitem]
 |     partitions=1/1 files=3 size=193.71MB
@@ -2770,11 +3018,13 @@ PLAN-ROOT SINK
 |     extrapolated-rows=disabled max-scan-range-rows=2141802
 |     mem-estimate=80.00MB mem-reservation=24.00MB thread-reservation=1
 |     tuple-ids=3 row-size=70B cardinality=6001215
+|     in pipelines: 05(GETNEXT)
 |
 04:AGGREGATE [FINALIZE]
 |  group by: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_comment
 |  mem-estimate=42.58MB mem-reservation=34.00MB spill-buffer=2.00MB thread-reservation=0
 |  tuple-ids=2 row-size=70B cardinality=575772
+|  in pipelines: 04(GETNEXT), 01(OPEN)
 |
 03:HASH JOIN [INNER JOIN]
 |  hash predicates: l_orderkey = o_orderkey
@@ -2782,6 +3032,7 @@ PLAN-ROOT SINK
 |  runtime filters: RF000[bloom] <- o_orderkey
 |  mem-estimate=17.00MB mem-reservation=17.00MB spill-buffer=1.00MB thread-reservation=0
 |  tuple-ids=0,1 row-size=86B cardinality=575772
+|  in pipelines: 01(GETNEXT), 02(OPEN)
 |
 |--02:SCAN HDFS [tpch_parquet.orders]
 |     partitions=1/1 files=2 size=54.07MB
@@ -2791,6 +3042,7 @@ PLAN-ROOT SINK
 |     extrapolated-rows=disabled max-scan-range-rows=1177135
 |     mem-estimate=40.00MB mem-reservation=4.00MB thread-reservation=1
 |     tuple-ids=1 row-size=8B cardinality=1500000
+|     in pipelines: 02(GETNEXT)
 |
 01:SCAN HDFS [tpch_parquet.lineitem]
    partitions=1/1 files=3 size=193.71MB
@@ -2804,6 +3056,7 @@ PLAN-ROOT SINK
    parquet dictionary predicates: l_tax > 10
    mem-estimate=80.00MB me

<TRUNCATED>