You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kr...@apache.org on 2022/03/23 03:01:26 UTC

[hive] branch master updated: HIVE-26043: Use constraint info when creating RexNodes (Krisztian Kasa, reviewed by Stamatis Zampetakis)

This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 13016e5  HIVE-26043: Use constraint info when creating RexNodes (Krisztian Kasa, reviewed by Stamatis Zampetakis)
13016e5 is described below

commit 13016e514bce3b3e39cd7253d5acab5bf955cbfe
Author: Krisztian Kasa <ka...@gmail.com>
AuthorDate: Wed Mar 23 04:01:10 2022 +0100

    HIVE-26043: Use constraint info when creating RexNodes (Krisztian Kasa, reviewed by Stamatis Zampetakis)
---
 .../org/apache/hadoop/hive/ql/exec/ColumnInfo.java |  29 ++-
 .../calcite/translator/TypeConverter.java          |  22 +-
 .../hadoop/hive/ql/parse/CalcitePlanner.java       |  82 ++-----
 .../hive/ql/parse/type/RexNodeExprFactory.java     |   6 +-
 .../perf/tpcds30tb/tez/cbo_query45.q.out           |  22 +-
 .../perf/tpcds30tb/tez/query45.q.out               | 244 +++++++++------------
 6 files changed, 171 insertions(+), 234 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
index 3f237af..04ff844 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
@@ -61,7 +61,10 @@ public class ColumnInfo implements Serializable {
 
   private String typeName;
 
+  private final boolean nullable;
+
   public ColumnInfo() {
+    nullable = true;
   }
 
   public ColumnInfo(String internalName, TypeInfo type, String tabAlias,
@@ -69,6 +72,11 @@ public class ColumnInfo implements Serializable {
     this(internalName, type, tabAlias, isVirtualCol, false);
   }
 
+  public ColumnInfo(String internalName, TypeInfo type, boolean nullable, String tabAlias,
+      boolean isVirtualCol) {
+    this(internalName, type, nullable, tabAlias, isVirtualCol, false);
+  }
+
   public ColumnInfo(String internalName, Class type, String tabAlias,
       boolean isVirtualCol) {
     this(internalName, TypeInfoFactory
@@ -80,6 +88,17 @@ public class ColumnInfo implements Serializable {
       boolean isVirtualCol, boolean isHiddenVirtualCol) {
     this(internalName,
          TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(type),
+         true,
+         tabAlias,
+         isVirtualCol,
+         isHiddenVirtualCol);
+  }
+
+  public ColumnInfo(String internalName, TypeInfo type, boolean nullable, String tabAlias,
+      boolean isVirtualCol, boolean isHiddenVirtualCol) {
+    this(internalName,
+         TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(type),
+         nullable,
          tabAlias,
          isVirtualCol,
          isHiddenVirtualCol);
@@ -87,16 +106,17 @@ public class ColumnInfo implements Serializable {
 
   public ColumnInfo(String internalName, ObjectInspector objectInspector,
       String tabAlias, boolean isVirtualCol) {
-    this(internalName, objectInspector, tabAlias, isVirtualCol, false);
+    this(internalName, objectInspector, true, tabAlias, isVirtualCol, false);
   }
 
-  public ColumnInfo(String internalName, ObjectInspector objectInspector,
+  public ColumnInfo(String internalName, ObjectInspector objectInspector, boolean nullable,
       String tabAlias, boolean isVirtualCol, boolean isHiddenVirtualCol) {
     this.internalName = internalName;
     this.objectInspector = objectInspector;
     this.tabAlias = tabAlias;
     this.isVirtualCol = isVirtualCol;
     this.isHiddenVirtualCol = isHiddenVirtualCol;
+    this.nullable = nullable;
     setTypeName(getType().getTypeName());
   }
 
@@ -107,6 +127,7 @@ public class ColumnInfo implements Serializable {
     this.tabAlias = columnInfo.getTabAlias();
     this.isVirtualCol = columnInfo.getIsVirtualCol();
     this.isHiddenVirtualCol = columnInfo.isHiddenVirtualCol();
+    this.nullable = columnInfo.nullable;
     this.setType(columnInfo.getType());
   }
 
@@ -254,4 +275,8 @@ public class ColumnInfo implements Serializable {
   public void setObjectinspector(ObjectInspector writableObjectInspector) {
     this.objectInspector = writableObjectInspector;
   }
+
+  public boolean isNullable() {
+    return nullable;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/TypeConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/TypeConverter.java
index e95ff18..7d22797 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/TypeConverter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/TypeConverter.java
@@ -128,25 +128,29 @@ public class TypeConverter {
     RexBuilder rexBuilder = cluster.getRexBuilder();
     RelDataTypeFactory dtFactory = rexBuilder.getTypeFactory();
     RowSchema rs = rr.getRowSchema();
-    List<RelDataType> fieldTypes = new LinkedList<RelDataType>();
-    List<String> fieldNames = new LinkedList<String>();
+    List<RelDataType> fieldTypes = new LinkedList<>();
+    List<String> fieldNames = new LinkedList<>();
 
     for (ColumnInfo ci : rs.getSignature()) {
       if (neededCols == null || neededCols.contains(ci.getInternalName())) {
-        fieldTypes.add(convert(ci.getType(), dtFactory));
+        fieldTypes.add(convert(ci.getType(), ci.isNullable(), dtFactory));
         fieldNames.add(ci.getInternalName());
       }
     }
     return dtFactory.createStructType(fieldTypes, fieldNames);
   }
 
-  public static RelDataType convert(TypeInfo type, RelDataTypeFactory dtFactory)
+  public static RelDataType convert(TypeInfo type, RelDataTypeFactory dtFactory) throws CalciteSemanticException {
+    return convert(type, true, dtFactory);
+  }
+
+  public static RelDataType convert(TypeInfo type, boolean nullable, RelDataTypeFactory dtFactory)
       throws CalciteSemanticException {
     RelDataType convertedType = null;
 
     switch (type.getCategory()) {
     case PRIMITIVE:
-      convertedType = convert((PrimitiveTypeInfo) type, dtFactory);
+      convertedType = convert((PrimitiveTypeInfo) type, nullable, dtFactory);
       break;
     case LIST:
       convertedType = convert((ListTypeInfo) type, dtFactory);
@@ -162,10 +166,14 @@ public class TypeConverter {
       break;
     }
     // hive does not have concept of not nullable types
-    return dtFactory.createTypeWithNullability(convertedType, true);
+    return dtFactory.createTypeWithNullability(convertedType, nullable);
   }
 
   public static RelDataType convert(PrimitiveTypeInfo type, RelDataTypeFactory dtFactory) {
+    return convert(type, true, dtFactory);
+  }
+
+  public static RelDataType convert(PrimitiveTypeInfo type, boolean nullable, RelDataTypeFactory dtFactory) {
     RelDataType convertedType = null;
 
     switch (type.getPrimitiveCategory()) {
@@ -242,7 +250,7 @@ public class TypeConverter {
       throw new RuntimeException("Unsupported Type : " + type.getTypeName());
     }
 
-    return dtFactory.createTypeWithNullability(convertedType, true);
+    return dtFactory.createTypeWithNullability(convertedType, nullable);
   }
 
   public static RelDataType convert(ListTypeInfo lstType,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index bc55d0e..a76ce37 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -2913,13 +2913,17 @@ public class CalcitePlanner extends SemanticAnalyzer {
         List<? extends StructField> fields = rowObjectInspector.getAllStructFieldRefs();
         ColumnInfo colInfo;
         String colName;
-        ArrayList<ColumnInfo> cInfoLst = new ArrayList<ColumnInfo>();
-        for (int i = 0; i < fields.size(); i++) {
-          colName = fields.get(i).getFieldName();
+        ArrayList<ColumnInfo> cInfoLst = new ArrayList<>();
+
+        final NotNullConstraint nnc = tabMetaData.getNotNullConstraint();
+        final PrimaryKeyInfo pkc = tabMetaData.getPrimaryKeyInfo();
+
+        for (StructField structField : fields) {
+          colName = structField.getFieldName();
           colInfo = new ColumnInfo(
-              fields.get(i).getFieldName(),
-              TypeInfoUtils.getTypeInfoFromObjectInspector(fields.get(i).getFieldObjectInspector()),
-              tableAlias, false);
+                  structField.getFieldName(),
+                  TypeInfoUtils.getTypeInfoFromObjectInspector(structField.getFieldObjectInspector()),
+                  isNullable(colName, nnc, pkc), tableAlias, false);
           colInfo.setSkewedCol(isSkewedCol(tableAlias, qb, colName));
           rr.put(tableAlias, colName, colInfo);
           cInfoLst.add(colInfo);
@@ -2932,7 +2936,8 @@ public class CalcitePlanner extends SemanticAnalyzer {
         for (FieldSchema part_col : tabMetaData.getPartCols()) {
           colName = part_col.getName();
           colInfo = new ColumnInfo(colName,
-              TypeInfoFactory.getPrimitiveTypeInfo(part_col.getType()), tableAlias, true);
+                  TypeInfoFactory.getPrimitiveTypeInfo(part_col.getType()),
+                  isNullable(colName, nnc, pkc), tableAlias, true);
           rr.put(tableAlias, colName, colInfo);
           cInfoLst.add(colInfo);
           partitionColumns.add(colInfo);
@@ -3082,7 +3087,7 @@ public class CalcitePlanner extends SemanticAnalyzer {
           }
         } else {
           // Build row type from field <type, name>
-          RelDataType rowType = inferNotNullableColumns(tabMetaData, TypeConverter.getType(cluster, rr, null));
+          RelDataType rowType = TypeConverter.getType(cluster, rr, null);
           // Build RelOptAbstractTable
           List<String> fullyQualifiedTabName = new ArrayList<>();
           if (tabMetaData.getDbName() != null && !tabMetaData.getDbName().isEmpty()) {
@@ -3119,65 +3124,16 @@ public class CalcitePlanner extends SemanticAnalyzer {
       return tableRel;
     }
 
-    private RelDataType inferNotNullableColumns(Table tabMetaData, RelDataType rowType)
-        throws HiveException {
-      final NotNullConstraint nnc = tabMetaData.getNotNullConstraint();
-      final PrimaryKeyInfo pkc = tabMetaData.getPrimaryKeyInfo();
-      if ((nnc == null || nnc.getNotNullConstraints().isEmpty()) &&
-          (pkc == null || pkc.getColNames().isEmpty())) {
-        return rowType;
+    private boolean isNullable(String colName, NotNullConstraint notNullConstraints, PrimaryKeyInfo primaryKeyInfo) {
+      if (notNullConstraints != null && notNullConstraints.getNotNullConstraints().containsValue(colName)) {
+        return false;
       }
 
-      // Build the bitset with not null columns
-      ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
-      if (nnc != null) {
-        for (String nnCol : nnc.getNotNullConstraints().values()) {
-          int nnPos = -1;
-          for (int i = 0; i < rowType.getFieldNames().size(); i++) {
-            if (rowType.getFieldNames().get(i).equals(nnCol)) {
-              nnPos = i;
-              break;
-            }
-          }
-          if (nnPos == -1) {
-            LOG.error("Column for not null constraint definition " + nnCol + " not found");
-            return rowType;
-          }
-          builder.set(nnPos);
-        }
-      }
-      if (pkc != null) {
-        for (String pkCol : pkc.getColNames().values()) {
-          int pkPos = -1;
-          for (int i = 0; i < rowType.getFieldNames().size(); i++) {
-            if (rowType.getFieldNames().get(i).equals(pkCol)) {
-              pkPos = i;
-              break;
-            }
-          }
-          if (pkPos == -1) {
-            LOG.error("Column for not null constraint definition " + pkCol + " not found");
-            return rowType;
-          }
-          builder.set(pkPos);
-        }
+      if (primaryKeyInfo != null && primaryKeyInfo.getColNames().containsValue(colName)) {
+        return false;
       }
-      ImmutableBitSet bitSet = builder.build();
 
-      RexBuilder rexBuilder = cluster.getRexBuilder();
-      RelDataTypeFactory dtFactory = rexBuilder.getTypeFactory();
-
-      List<RelDataType> fieldTypes = new LinkedList<RelDataType>();
-      List<String> fieldNames = new LinkedList<String>();
-      for (RelDataTypeField rdtf : rowType.getFieldList()) {
-        if (bitSet.indexOf(rdtf.getIndex()) != -1) {
-          fieldTypes.add(dtFactory.createTypeWithNullability(rdtf.getType(), false));
-        } else {
-          fieldTypes.add(rdtf.getType());
-        }
-        fieldNames.add(rdtf.getName());
-      }
-      return dtFactory.createStructType(fieldTypes, fieldNames);
+      return true;
     }
 
     private TableType obtainTableType(Table tabMetaData) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/type/RexNodeExprFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/type/RexNodeExprFactory.java
index 979b4f6..a572038 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/type/RexNodeExprFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/type/RexNodeExprFactory.java
@@ -31,7 +31,6 @@ import org.apache.calcite.avatica.util.TimeUnit;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
@@ -62,7 +61,6 @@ import org.apache.hadoop.hive.common.type.TimestampTZ;
 import org.apache.hadoop.hive.common.type.TimestampTZUtil;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
-import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo;
 import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
 import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature;
@@ -76,8 +74,6 @@ import org.apache.hadoop.hive.ql.parse.QBSubQueryParseInfo;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.SubqueryType;
-import org.apache.hadoop.hive.ql.udf.SettableUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -133,7 +129,7 @@ public class RexNodeExprFactory extends ExprFactory<RexNode> {
       throw new CalciteSemanticException("Unexpected error: Cannot find column");
     }
     return rexBuilder.makeInputRef(
-        TypeConverter.convert(colInfo.getType(), rexBuilder.getTypeFactory()), index + offset);
+        TypeConverter.convert(colInfo.getType(), colInfo.isNullable(), rexBuilder.getTypeFactory()), index + offset);
   }
 
   private static RexNode toPrimitiveConstDesc(
diff --git a/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/cbo_query45.q.out b/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/cbo_query45.q.out
index 47a3c35..b2de1c3 100644
--- a/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/cbo_query45.q.out
+++ b/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/cbo_query45.q.out
@@ -1,21 +1,15 @@
-Warning: Map Join MAPJOIN[127][bigTable=?] in task 'Map 1' is a cross product
 CBO PLAN:
 HiveSortLimit(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[100])
   HiveProject(ca_zip=[$1], ca_county=[$0], $f2=[$2])
     HiveAggregate(group=[{7, 8}], agg#0=[sum($2)])
-      HiveFilter(condition=[OR(AND(<>($14, 0), IS NOT NULL($16)), IN(substr($8, 1, 5), _UTF-16LE'85669':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'86197':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'88274':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'83405':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'86475':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'85392':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'85460':VARCHAR(21 [...]
-        HiveProject(ws_item_sk=[$6], ws_bill_customer_sk=[$7], ws_sales_price=[$8], ws_sold_date_sk=[$9], c_customer_sk=[$0], c_current_addr_sk=[$1], ca_address_sk=[$3], ca_county=[$4], ca_zip=[$5], d_date_sk=[$10], d_year=[$11], d_qoy=[$12], i_item_sk=[$13], i_item_id=[$14], c=[$2], i_item_id0=[$15], literalTrue=[$16])
-          HiveJoin(condition=[=($6, $13)], joinType=[inner], algorithm=[none], cost=[not available])
-            HiveJoin(condition=[=($7, $0)], joinType=[inner], algorithm=[none], cost=[not available])
-              HiveJoin(condition=[=($1, $3)], joinType=[inner], algorithm=[none], cost=[not available])
-                HiveJoin(condition=[true], joinType=[inner], algorithm=[none], cost=[not available])
-                  HiveProject(c_customer_sk=[$0], c_current_addr_sk=[$4])
-                    HiveFilter(condition=[IS NOT NULL($4)])
-                      HiveTableScan(table=[[default, customer]], table:alias=[customer])
-                  HiveProject(c=[$0])
-                    HiveAggregate(group=[{}], c=[COUNT()])
-                      HiveFilter(condition=[IN($0, 2:BIGINT, 3:BIGINT, 5:BIGINT, 7:BIGINT, 11:BIGINT, 13:BIGINT, 17:BIGINT, 19:BIGINT, 23:BIGINT, 29:BIGINT)])
-                        HiveTableScan(table=[[default, item]], table:alias=[item])
+      HiveFilter(condition=[OR(IS NOT NULL($15), IN(substr($8, 1, 5), _UTF-16LE'85669':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'86197':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'88274':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'83405':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'86475':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'85392':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'85460':VARCHAR(2147483647) CHARACT [...]
+        HiveProject(ws_item_sk=[$5], ws_bill_customer_sk=[$6], ws_sales_price=[$7], ws_sold_date_sk=[$8], c_customer_sk=[$0], c_current_addr_sk=[$1], ca_address_sk=[$2], ca_county=[$3], ca_zip=[$4], d_date_sk=[$9], d_year=[$10], d_qoy=[$11], i_item_sk=[$12], i_item_id=[$13], i_item_id0=[$14], literalTrue=[$15])
+          HiveJoin(condition=[=($5, $12)], joinType=[inner], algorithm=[none], cost=[not available])
+            HiveJoin(condition=[=($6, $0)], joinType=[inner], algorithm=[none], cost=[not available])
+              HiveJoin(condition=[=($1, $2)], joinType=[inner], algorithm=[none], cost=[not available])
+                HiveProject(c_customer_sk=[$0], c_current_addr_sk=[$4])
+                  HiveFilter(condition=[IS NOT NULL($4)])
+                    HiveTableScan(table=[[default, customer]], table:alias=[customer])
                 HiveProject(ca_address_sk=[$0], ca_county=[$7], ca_zip=[$9])
                   HiveTableScan(table=[[default, customer_address]], table:alias=[customer_address])
               HiveProject(ws_item_sk=[$0], ws_bill_customer_sk=[$1], ws_sales_price=[$2], ws_sold_date_sk=[$3], d_date_sk=[$4], d_year=[$5], d_qoy=[$6])
diff --git a/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/query45.q.out b/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/query45.q.out
index e75a25d..90e29e7 100644
--- a/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/query45.q.out
+++ b/ql/src/test/results/clientpositive/perf/tpcds30tb/tez/query45.q.out
@@ -1,4 +1,3 @@
-Warning: Map Join MAPJOIN[127][bigTable=?] in task 'Map 1' is a cross product
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
   Stage-0 depends on stages: Stage-1
@@ -8,15 +7,13 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Map 1 <- Reducer 4 (BROADCAST_EDGE)
-        Map 3 <- Reducer 12 (BROADCAST_EDGE)
-        Map 6 <- Map 10 (BROADCAST_EDGE)
-        Reducer 12 <- Map 11 (SIMPLE_EDGE)
-        Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 5 (CUSTOM_SIMPLE_EDGE)
-        Reducer 4 <- Map 3 (CUSTOM_SIMPLE_EDGE)
-        Reducer 7 <- Map 3 (BROADCAST_EDGE), Map 6 (CUSTOM_SIMPLE_EDGE), Reducer 2 (CUSTOM_SIMPLE_EDGE)
-        Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
-        Reducer 9 <- Reducer 8 (SIMPLE_EDGE)
+        Map 4 <- Map 8 (BROADCAST_EDGE)
+        Map 9 <- Reducer 11 (BROADCAST_EDGE)
+        Reducer 11 <- Map 10 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Map 3 (CUSTOM_SIMPLE_EDGE)
+        Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE), Map 9 (BROADCAST_EDGE), Reducer 2 (CUSTOM_SIMPLE_EDGE)
+        Reducer 6 <- Reducer 5 (SIMPLE_EDGE)
+        Reducer 7 <- Reducer 6 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -32,63 +29,16 @@ STAGE PLANS:
                       expressions: c_customer_sk (type: bigint), c_current_addr_sk (type: bigint)
                       outputColumnNames: _col0, _col1
                       Statistics: Num rows: 80000000 Data size: 1280000000 Basic stats: COMPLETE Column stats: COMPLETE
-                      Map Join Operator
-                        condition map:
-                             Inner Join 0 to 1
-                        keys:
-                          0 
-                          1 
-                        outputColumnNames: _col0, _col1, _col2
-                        input vertices:
-                          1 Reducer 4
-                        Statistics: Num rows: 80000000 Data size: 1920000000 Basic stats: COMPLETE Column stats: COMPLETE
-                        Reduce Output Operator
-                          key expressions: _col1 (type: bigint)
-                          null sort order: z
-                          sort order: +
-                          Map-reduce partition columns: _col1 (type: bigint)
-                          Statistics: Num rows: 80000000 Data size: 1920000000 Basic stats: COMPLETE Column stats: COMPLETE
-                          value expressions: _col0 (type: bigint), _col2 (type: bigint)
-            Execution mode: vectorized, llap
-            LLAP IO: may be used (ACID table)
-        Map 10 
-            Map Operator Tree:
-                TableScan
-                  alias: date_dim
-                  filterExpr: ((d_year = 2000) and (d_qoy = 2)) (type: boolean)
-                  Statistics: Num rows: 73049 Data size: 1168784 Basic stats: COMPLETE Column stats: COMPLETE
-                  Filter Operator
-                    predicate: ((d_year = 2000) and (d_qoy = 2)) (type: boolean)
-                    Statistics: Num rows: 92 Data size: 1472 Basic stats: COMPLETE Column stats: COMPLETE
-                    Select Operator
-                      expressions: d_date_sk (type: bigint)
-                      outputColumnNames: _col0
-                      Statistics: Num rows: 92 Data size: 736 Basic stats: COMPLETE Column stats: COMPLETE
                       Reduce Output Operator
-                        key expressions: _col0 (type: bigint)
+                        key expressions: _col1 (type: bigint)
                         null sort order: z
                         sort order: +
-                        Map-reduce partition columns: _col0 (type: bigint)
-                        Statistics: Num rows: 92 Data size: 736 Basic stats: COMPLETE Column stats: COMPLETE
-                      Select Operator
-                        expressions: _col0 (type: bigint)
-                        outputColumnNames: _col0
-                        Statistics: Num rows: 92 Data size: 736 Basic stats: COMPLETE Column stats: COMPLETE
-                        Group By Operator
-                          keys: _col0 (type: bigint)
-                          minReductionHashAggr: 0.4
-                          mode: hash
-                          outputColumnNames: _col0
-                          Statistics: Num rows: 92 Data size: 736 Basic stats: COMPLETE Column stats: COMPLETE
-                          Dynamic Partitioning Event Operator
-                            Target column: ws_sold_date_sk (bigint)
-                            Target Input: web_sales
-                            Partition key expr: ws_sold_date_sk
-                            Statistics: Num rows: 92 Data size: 736 Basic stats: COMPLETE Column stats: COMPLETE
-                            Target Vertex: Map 6
+                        Map-reduce partition columns: _col1 (type: bigint)
+                        Statistics: Num rows: 80000000 Data size: 1280000000 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col0 (type: bigint)
             Execution mode: vectorized, llap
             LLAP IO: may be used (ACID table)
-        Map 11 
+        Map 10 
             Map Operator Tree:
                 TableScan
                   alias: item
@@ -118,50 +68,6 @@ STAGE PLANS:
         Map 3 
             Map Operator Tree:
                 TableScan
-                  alias: item
-                  Statistics: Num rows: 462000 Data size: 3696000 Basic stats: COMPLETE Column stats: COMPLETE
-                  Filter Operator
-                    predicate: (i_item_sk) IN (2L, 3L, 5L, 7L, 11L, 13L, 17L, 19L, 23L, 29L) (type: boolean)
-                    Statistics: Num rows: 10 Data size: 80 Basic stats: COMPLETE Column stats: COMPLETE
-                    Select Operator
-                      Statistics: Num rows: 10 Data size: 80 Basic stats: COMPLETE Column stats: COMPLETE
-                      Group By Operator
-                        aggregations: count()
-                        minReductionHashAggr: 0.9
-                        mode: hash
-                        outputColumnNames: _col0
-                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
-                        Reduce Output Operator
-                          null sort order: 
-                          sort order: 
-                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
-                          value expressions: _col0 (type: bigint)
-                  Select Operator
-                    expressions: i_item_sk (type: bigint), i_item_id (type: string)
-                    outputColumnNames: _col0, _col1
-                    Statistics: Num rows: 462000 Data size: 49896000 Basic stats: COMPLETE Column stats: COMPLETE
-                    Map Join Operator
-                      condition map:
-                           Left Outer Join 0 to 1
-                      keys:
-                        0 _col1 (type: string)
-                        1 _col0 (type: string)
-                      outputColumnNames: _col0, _col3
-                      input vertices:
-                        1 Reducer 12
-                      Statistics: Num rows: 462018 Data size: 3696220 Basic stats: COMPLETE Column stats: COMPLETE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: bigint)
-                        null sort order: z
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: bigint)
-                        Statistics: Num rows: 462018 Data size: 3696220 Basic stats: COMPLETE Column stats: COMPLETE
-                        value expressions: _col3 (type: boolean)
-            Execution mode: vectorized, llap
-            LLAP IO: may be used (ACID table)
-        Map 5 
-            Map Operator Tree:
-                TableScan
                   alias: customer_address
                   Statistics: Num rows: 40000000 Data size: 7800000000 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
@@ -177,7 +83,7 @@ STAGE PLANS:
                       value expressions: _col1 (type: varchar(30)), _col2 (type: char(10))
             Execution mode: vectorized, llap
             LLAP IO: may be used (ACID table)
-        Map 6 
+        Map 4 
             Map Operator Tree:
                 TableScan
                   alias: web_sales
@@ -198,7 +104,7 @@ STAGE PLANS:
                           1 _col0 (type: bigint)
                         outputColumnNames: _col0, _col1, _col2
                         input vertices:
-                          1 Map 10
+                          1 Map 8
                         Statistics: Num rows: 1087859571 Data size: 138922052728 Basic stats: COMPLETE Column stats: COMPLETE
                         Reduce Output Operator
                           key expressions: _col1 (type: bigint)
@@ -209,7 +115,72 @@ STAGE PLANS:
                           value expressions: _col0 (type: bigint), _col2 (type: decimal(7,2))
             Execution mode: vectorized, llap
             LLAP IO: may be used (ACID table)
-        Reducer 12 
+        Map 8 
+            Map Operator Tree:
+                TableScan
+                  alias: date_dim
+                  filterExpr: ((d_year = 2000) and (d_qoy = 2)) (type: boolean)
+                  Statistics: Num rows: 73049 Data size: 1168784 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: ((d_year = 2000) and (d_qoy = 2)) (type: boolean)
+                    Statistics: Num rows: 92 Data size: 1472 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: d_date_sk (type: bigint)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 92 Data size: 736 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: bigint)
+                        null sort order: z
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: bigint)
+                        Statistics: Num rows: 92 Data size: 736 Basic stats: COMPLETE Column stats: COMPLETE
+                      Select Operator
+                        expressions: _col0 (type: bigint)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 92 Data size: 736 Basic stats: COMPLETE Column stats: COMPLETE
+                        Group By Operator
+                          keys: _col0 (type: bigint)
+                          minReductionHashAggr: 0.4
+                          mode: hash
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 92 Data size: 736 Basic stats: COMPLETE Column stats: COMPLETE
+                          Dynamic Partitioning Event Operator
+                            Target column: ws_sold_date_sk (bigint)
+                            Target Input: web_sales
+                            Partition key expr: ws_sold_date_sk
+                            Statistics: Num rows: 92 Data size: 736 Basic stats: COMPLETE Column stats: COMPLETE
+                            Target Vertex: Map 4
+            Execution mode: vectorized, llap
+            LLAP IO: may be used (ACID table)
+        Map 9 
+            Map Operator Tree:
+                TableScan
+                  alias: item
+                  Statistics: Num rows: 462000 Data size: 49896000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Select Operator
+                    expressions: i_item_sk (type: bigint), i_item_id (type: string)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 462000 Data size: 49896000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Map Join Operator
+                      condition map:
+                           Left Outer Join 0 to 1
+                      keys:
+                        0 _col1 (type: string)
+                        1 _col0 (type: string)
+                      outputColumnNames: _col0, _col3
+                      input vertices:
+                        1 Reducer 11
+                      Statistics: Num rows: 462018 Data size: 3696220 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: bigint)
+                        null sort order: z
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: bigint)
+                        Statistics: Num rows: 462018 Data size: 3696220 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col3 (type: boolean)
+            Execution mode: vectorized, llap
+            LLAP IO: may be used (ACID table)
+        Reducer 11 
             Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
@@ -237,32 +208,19 @@ STAGE PLANS:
                 keys:
                   0 KEY.reducesinkkey0 (type: bigint)
                   1 KEY.reducesinkkey0 (type: bigint)
-                outputColumnNames: _col0, _col2, _col4, _col5
+                outputColumnNames: _col0, _col3, _col4
                 input vertices:
-                  1 Map 5
-                Statistics: Num rows: 80000000 Data size: 16240000000 Basic stats: COMPLETE Column stats: COMPLETE
+                  1 Map 3
+                Statistics: Num rows: 80000000 Data size: 15600000000 Basic stats: COMPLETE Column stats: COMPLETE
                 DynamicPartitionHashJoin: true
                 Reduce Output Operator
                   key expressions: _col0 (type: bigint)
                   null sort order: z
                   sort order: +
                   Map-reduce partition columns: _col0 (type: bigint)
-                  Statistics: Num rows: 80000000 Data size: 16240000000 Basic stats: COMPLETE Column stats: COMPLETE
-                  value expressions: _col2 (type: bigint), _col4 (type: varchar(30)), _col5 (type: char(10))
-        Reducer 4 
-            Execution mode: vectorized, llap
-            Reduce Operator Tree:
-              Group By Operator
-                aggregations: count(VALUE._col0)
-                mode: mergepartial
-                outputColumnNames: _col0
-                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
-                Reduce Output Operator
-                  null sort order: 
-                  sort order: 
-                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
-                  value expressions: _col0 (type: bigint)
-        Reducer 7 
+                  Statistics: Num rows: 80000000 Data size: 15600000000 Basic stats: COMPLETE Column stats: COMPLETE
+                  value expressions: _col3 (type: varchar(30)), _col4 (type: char(10))
+        Reducer 5 
             Execution mode: vectorized, llap
             Reduce Operator Tree:
               Map Join Operator
@@ -271,38 +229,38 @@ STAGE PLANS:
                 keys:
                   0 KEY.reducesinkkey0 (type: bigint)
                   1 KEY.reducesinkkey0 (type: bigint)
-                outputColumnNames: _col2, _col4, _col5, _col6, _col8
+                outputColumnNames: _col3, _col4, _col5, _col7
                 input vertices:
                   0 Reducer 2
-                Statistics: Num rows: 1087859571 Data size: 342373338881 Basic stats: COMPLETE Column stats: COMPLETE
+                Statistics: Num rows: 1087859571 Data size: 333670462313 Basic stats: COMPLETE Column stats: COMPLETE
                 DynamicPartitionHashJoin: true
                 Map Join Operator
                   condition map:
                        Inner Join 0 to 1
                   keys:
-                    0 _col6 (type: bigint)
+                    0 _col5 (type: bigint)
                     1 _col0 (type: bigint)
-                  outputColumnNames: _col2, _col4, _col5, _col8, _col16
+                  outputColumnNames: _col3, _col4, _col7, _col15
                   input vertices:
-                    1 Map 3
-                  Statistics: Num rows: 1087859571 Data size: 338020052601 Basic stats: COMPLETE Column stats: COMPLETE
+                    1 Map 9
+                  Statistics: Num rows: 1087859571 Data size: 329317176033 Basic stats: COMPLETE Column stats: COMPLETE
                   Select Operator
-                    expressions: _col8 (type: decimal(7,2)), _col4 (type: varchar(30)), _col5 (type: char(10)), _col2 (type: bigint), _col16 (type: boolean)
-                    outputColumnNames: _col2, _col7, _col8, _col14, _col16
-                    Statistics: Num rows: 1087859571 Data size: 338020052601 Basic stats: COMPLETE Column stats: COMPLETE
+                    expressions: _col7 (type: decimal(7,2)), _col3 (type: varchar(30)), _col4 (type: char(10)), _col15 (type: boolean)
+                    outputColumnNames: _col2, _col7, _col8, _col15
+                    Statistics: Num rows: 1087859571 Data size: 329317176033 Basic stats: COMPLETE Column stats: COMPLETE
                     Filter Operator
-                      predicate: (((_col14 <> 0L) and _col16 is not null) or (substr(_col8, 1, 5)) IN ('85669', '86197', '88274', '83405', '86475', '85392', '85460', '80348', '81792')) (type: boolean)
-                      Statistics: Num rows: 1087859571 Data size: 338020052601 Basic stats: COMPLETE Column stats: COMPLETE
+                      predicate: (_col15 is not null or (substr(_col8, 1, 5)) IN ('85669', '86197', '88274', '83405', '86475', '85392', '85460', '80348', '81792')) (type: boolean)
+                      Statistics: Num rows: 1087859571 Data size: 329317176033 Basic stats: COMPLETE Column stats: COMPLETE
                       Top N Key Operator
                         sort order: ++
                         keys: _col8 (type: char(10)), _col7 (type: varchar(30))
                         null sort order: zz
-                        Statistics: Num rows: 1087859571 Data size: 338020052601 Basic stats: COMPLETE Column stats: COMPLETE
+                        Statistics: Num rows: 1087859571 Data size: 329317176033 Basic stats: COMPLETE Column stats: COMPLETE
                         top n: 100
                         Select Operator
                           expressions: _col2 (type: decimal(7,2)), _col7 (type: varchar(30)), _col8 (type: char(10))
                           outputColumnNames: _col2, _col7, _col8
-                          Statistics: Num rows: 1087859571 Data size: 338020052601 Basic stats: COMPLETE Column stats: COMPLETE
+                          Statistics: Num rows: 1087859571 Data size: 329317176033 Basic stats: COMPLETE Column stats: COMPLETE
                           Group By Operator
                             aggregations: sum(_col2)
                             keys: _col8 (type: char(10)), _col7 (type: varchar(30))
@@ -317,7 +275,7 @@ STAGE PLANS:
                               Map-reduce partition columns: _col0 (type: char(10)), _col1 (type: varchar(30))
                               Statistics: Num rows: 1087859571 Data size: 325270011729 Basic stats: COMPLETE Column stats: COMPLETE
                               value expressions: _col2 (type: decimal(17,2))
-        Reducer 8 
+        Reducer 6 
             Execution mode: vectorized, llap
             Reduce Operator Tree:
               Group By Operator
@@ -332,7 +290,7 @@ STAGE PLANS:
                   sort order: ++
                   Statistics: Num rows: 18408340 Data size: 5504093660 Basic stats: COMPLETE Column stats: COMPLETE
                   value expressions: _col2 (type: decimal(17,2))
-        Reducer 9 
+        Reducer 7 
             Execution mode: vectorized, llap
             Reduce Operator Tree:
               Select Operator