You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/08/25 23:28:33 UTC

[03/50] [abbrv] hive git commit: HIVE-11464: lineage info missing if there are multiple outputs (Jimmy)

HIVE-11464: lineage info missing if there are multiple outputs (Jimmy)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1a75644d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1a75644d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1a75644d

Branch: refs/heads/llap
Commit: 1a75644d68c8c61fbafb4058fe45b7823492491c
Parents: f26b256
Author: Jimmy Xiang <jx...@cloudera.com>
Authored: Wed Aug 5 08:02:50 2015 -0700
Committer: Jimmy Xiang <jx...@cloudera.com>
Committed: Thu Aug 13 13:44:03 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  8 ++--
 .../hadoop/hive/ql/hooks/LineageInfo.java       |  9 ++--
 .../hadoop/hive/ql/hooks/LineageLogger.java     | 44 +++++++++++++-------
 .../ql/optimizer/lineage/ExprProcFactory.java   |  9 ++--
 .../hive/ql/optimizer/lineage/LineageCtx.java   | 34 +++++++++++----
 .../ql/optimizer/lineage/OpProcFactory.java     | 10 ++---
 ql/src/test/queries/clientpositive/lineage3.q   | 15 +++++++
 .../test/results/clientpositive/lineage3.q.out  | 32 +++++++++++++-
 8 files changed, 118 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1a75644d/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index e7b7b55..c0c1b2e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -441,8 +441,11 @@ public class Driver implements CommandProcessor {
       // to avoid returning sensitive data
       String queryStr = HookUtils.redactLogString(conf, command);
 
+      // get the output schema
+      schema = getSchema(sem, conf);
+
       plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
-        SessionState.get().getHiveOperation(), getSchema(sem, conf));
+        SessionState.get().getHiveOperation(), schema);
 
       conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr);
 
@@ -454,9 +457,6 @@ public class Driver implements CommandProcessor {
         plan.getFetchTask().initialize(conf, plan, null);
       }
 
-      // get the output schema
-      schema = getSchema(sem, conf);
-
       //do the authorization check
       if (!sem.skipAuthorization() &&
           HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1a75644d/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
index fe0841e..2806c54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -375,9 +374,9 @@ public class LineageInfo implements Serializable {
     private String expr;
 
     /**
-     * The list of base columns that the particular column depends on.
+     * The set of base columns that the particular column depends on.
      */
-    private List<BaseColumnInfo> baseCols;
+    private Set<BaseColumnInfo> baseCols;
 
     /**
      * @return the type
@@ -410,14 +409,14 @@ public class LineageInfo implements Serializable {
     /**
      * @return the baseCols
      */
-    public List<BaseColumnInfo> getBaseCols() {
+    public Set<BaseColumnInfo> getBaseCols() {
       return baseCols;
     }
 
     /**
      * @param baseCols the baseCols to set
      */
-    public void setBaseCols(List<BaseColumnInfo> baseCols) {
+    public void setBaseCols(Set<BaseColumnInfo> baseCols) {
       this.baseCols = baseCols;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1a75644d/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
index d615372..3c6ce94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
@@ -33,6 +33,7 @@ import org.apache.commons.io.output.StringBuilderWriter;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -147,6 +148,7 @@ public class LineageLogger implements ExecuteWithHookContext {
           // Don't emit user/timestamp info in test mode,
           // so that the test golden output file is fixed.
           long queryTime = plan.getQueryStartTime().longValue();
+          if (queryTime == 0) queryTime = System.currentTimeMillis();
           writer.name("user").value(hookContext.getUgi().getUserName());
           writer.name("timestamp").value(queryTime/1000);
           writer.name("jobIds");
@@ -209,23 +211,28 @@ public class LineageLogger implements ExecuteWithHookContext {
    * For each target column, find out its sources based on the dependency index.
    */
   private List<Edge> getEdges(QueryPlan plan, Index index) {
-    List<FieldSchema> fieldSchemas = plan.getResultSchema().getFieldSchemas();
-    int fields = fieldSchemas == null ? 0 : fieldSchemas.size();
-    SelectOperator finalSelOp = index.getFinalSelectOp();
+    LinkedHashMap<String, ObjectPair<SelectOperator,
+      org.apache.hadoop.hive.ql.metadata.Table>> finalSelOps = index.getFinalSelectOps();
+    Set<Vertex> allTargets = new LinkedHashSet<Vertex>();
+    Map<String, Vertex> allSources = new LinkedHashMap<String, Vertex>();
     List<Edge> edges = new ArrayList<Edge>();
-    if (finalSelOp != null && fields > 0) {
-      Map<ColumnInfo, Dependency> colMap = index.getDependencies(finalSelOp);
-      List<Dependency> dependencies = colMap != null ? Lists.newArrayList(colMap.values()) : null;
-      if (dependencies == null || dependencies.size() != fields) {
-        log("Result schema has " + fields
-          + " fields, but we don't get as many dependencies");
+    for (ObjectPair<SelectOperator,
+        org.apache.hadoop.hive.ql.metadata.Table> pair: finalSelOps.values()) {
+      List<FieldSchema> fieldSchemas = plan.getResultSchema().getFieldSchemas();
+      SelectOperator finalSelOp = pair.getFirst();
+      org.apache.hadoop.hive.ql.metadata.Table t = pair.getSecond();
+      String destTableName = null;
+      List<String> colNames = null;
+      if (t != null) {
+        destTableName = t.getDbName() + "." + t.getTableName();
+        fieldSchemas = t.getCols();
       } else {
-        String destTableName = null;
-        List<String> colNames = null;
         // Based on the plan outputs, find out the target table name and column names.
         for (WriteEntity output : plan.getOutputs()) {
-          if (output.getType() == Entity.Type.TABLE) {
-            org.apache.hadoop.hive.ql.metadata.Table t = output.getTable();
+          Entity.Type entityType = output.getType();
+          if (entityType == Entity.Type.TABLE
+              || entityType == Entity.Type.PARTITION) {
+            t = output.getTable();
             destTableName = t.getDbName() + "." + t.getTableName();
             List<FieldSchema> cols = t.getCols();
             if (cols != null && !cols.isEmpty()) {
@@ -234,10 +241,15 @@ public class LineageLogger implements ExecuteWithHookContext {
             break;
           }
         }
-
+      }
+      int fields = fieldSchemas.size();
+      Map<ColumnInfo, Dependency> colMap = index.getDependencies(finalSelOp);
+      List<Dependency> dependencies = colMap != null ? Lists.newArrayList(colMap.values()) : null;
+      if (dependencies == null || dependencies.size() != fields) {
+        log("Result schema has " + fields
+          + " fields, but we don't get as many dependencies");
+      } else {
         // Go through each target column, generate the lineage edges.
-        Set<Vertex> allTargets = new LinkedHashSet<Vertex>();
-        Map<String, Vertex> allSources = new LinkedHashMap<String, Vertex>();
         for (int i = 0; i < fields; i++) {
           Vertex target = new Vertex(
             getTargetFieldName(i, destTableName, colNames, fieldSchemas));

http://git-wip-us.apache.org/repos/asf/hive/blob/1a75644d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
index 455a525..38040e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
@@ -24,6 +24,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Stack;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -124,7 +125,7 @@ public class ExprProcFactory {
         bci_set.addAll(child_dep.getBaseCols());
       }
 
-      dep.setBaseCols(new ArrayList<BaseColumnInfo>(bci_set));
+      dep.setBaseCols(bci_set);
       dep.setType(new_type);
 
       return dep;
@@ -146,7 +147,7 @@ public class ExprProcFactory {
       // Create a dependency that has no basecols
       Dependency dep = new Dependency();
       dep.setType(LineageInfo.DependencyType.SIMPLE);
-      dep.setBaseCols(new ArrayList<BaseColumnInfo>());
+      dep.setBaseCols(new LinkedHashSet<BaseColumnInfo>());
 
       return dep;
     }
@@ -218,9 +219,9 @@ public class ExprProcFactory {
       Dependency dep = lctx.getIndex().getDependency(inpOp, internalName);
       if ((tabAlias == null || tabAlias.startsWith("_") || tabAlias.startsWith("$"))
           && (dep != null && dep.getType() == DependencyType.SIMPLE)) {
-        List<BaseColumnInfo> baseCols = dep.getBaseCols();
+        Set<BaseColumnInfo> baseCols = dep.getBaseCols();
         if (baseCols != null && !baseCols.isEmpty()) {
-          BaseColumnInfo baseCol = baseCols.get(0);
+          BaseColumnInfo baseCol = baseCols.iterator().next();
           tabAlias = baseCol.getTabAlias().getAlias();
           alias = baseCol.getColumn().getName();
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/1a75644d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
index d26d8da..c33d775 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
@@ -25,7 +25,9 @@ import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo;
@@ -33,6 +35,7 @@ import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.Predicate;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
@@ -59,7 +62,11 @@ public class LineageCtx implements NodeProcessorCtx {
      */
     private final Map<Operator<? extends OperatorDesc>, Set<Predicate>> condMap;
 
-    private SelectOperator finalSelectOp;
+    /**
+     * A map from a final select operator id to the select operator
+     * and the corresponding target table in case an insert into query.
+     */
+    private LinkedHashMap<String, ObjectPair<SelectOperator, Table>> finalSelectOps;
 
     /**
      * Constructor.
@@ -69,6 +76,8 @@ public class LineageCtx implements NodeProcessorCtx {
         new LinkedHashMap<Operator<? extends OperatorDesc>,
                           LinkedHashMap<ColumnInfo, Dependency>>();
       condMap = new HashMap<Operator<? extends OperatorDesc>, Set<Predicate>>();
+      finalSelectOps =
+        new LinkedHashMap<String, ObjectPair<SelectOperator, Table>>();
     }
 
     /**
@@ -146,7 +155,7 @@ public class LineageCtx implements NodeProcessorCtx {
         old_dep.setType(new_type);
         Set<BaseColumnInfo> bci_set = new LinkedHashSet<BaseColumnInfo>(old_dep.getBaseCols());
         bci_set.addAll(dep.getBaseCols());
-        old_dep.setBaseCols(new ArrayList<BaseColumnInfo>(bci_set));
+        old_dep.setBaseCols(bci_set);
         // TODO: Fix the expressions later.
         old_dep.setExpr(null);
       }
@@ -179,16 +188,27 @@ public class LineageCtx implements NodeProcessorCtx {
       return condMap.get(op);
     }
 
-    public void setFinalSelectOp(SelectOperator sop) {
-      finalSelectOp = sop;
+    public void addFinalSelectOp(
+        SelectOperator sop, Operator<? extends OperatorDesc> sinkOp) {
+      String operatorId = sop.getOperatorId();
+      if (!finalSelectOps.containsKey(operatorId)) {
+        Table table = null;
+        if (sinkOp instanceof FileSinkOperator) {
+          FileSinkOperator fso = (FileSinkOperator) sinkOp;
+          table = fso.getConf().getTable();
+        }
+        finalSelectOps.put(operatorId,
+          new ObjectPair<SelectOperator, Table>(sop, table));
+      }
     }
 
-    public SelectOperator getFinalSelectOp() {
-      return finalSelectOp;
+    public LinkedHashMap<String,
+        ObjectPair<SelectOperator, Table>> getFinalSelectOps() {
+      return finalSelectOps;
     }
 
     public void clear() {
-      finalSelectOp = null;
+      finalSelectOps.clear();
       depMap.clear();
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/1a75644d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
index f670db8..5c5d0d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
@@ -120,7 +120,7 @@ public class OpProcFactory {
       }
 
       dep.setType(new_type);
-      dep.setBaseCols(new ArrayList<BaseColumnInfo>(col_set));
+      dep.setBaseCols(col_set);
 
       boolean isScript = op instanceof ScriptOperator;
 
@@ -186,7 +186,7 @@ public class OpProcFactory {
 
         // Populate the dependency
         dep.setType(LineageInfo.DependencyType.SIMPLE);
-        dep.setBaseCols(new ArrayList<BaseColumnInfo>());
+        dep.setBaseCols(new LinkedHashSet<BaseColumnInfo>());
         dep.getBaseCols().add(bci);
 
         // Put the dependency in the map
@@ -396,7 +396,7 @@ public class OpProcFactory {
       }
       if (op == null || (op.getChildOperators().isEmpty()
           && op instanceof FileSinkOperator)) {
-        lctx.getIndex().setFinalSelectOp(sop);
+        lctx.getIndex().addFinalSelectOp(sop, op);
       }
 
       return null;
@@ -450,7 +450,7 @@ public class OpProcFactory {
             new_type = LineageCtx.getNewDependencyType(expr_dep.getType(), new_type);
             bci_set.addAll(expr_dep.getBaseCols());
             if (expr_dep.getType() == LineageInfo.DependencyType.SIMPLE) {
-              BaseColumnInfo col = expr_dep.getBaseCols().get(0);
+              BaseColumnInfo col = expr_dep.getBaseCols().iterator().next();
               Table t = col.getTabAlias().getTable();
               if (t != null) {
                 sb.append(t.getDbName()).append(".").append(t.getTableName()).append(".");
@@ -514,7 +514,7 @@ public class OpProcFactory {
           }
         }
 
-        dep.setBaseCols(new ArrayList<BaseColumnInfo>(bci_set));
+        dep.setBaseCols(bci_set);
         dep.setType(new_type);
         lctx.getIndex().putDependency(gop, col_infos.get(cnt++), dep);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/1a75644d/ql/src/test/queries/clientpositive/lineage3.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/lineage3.q b/ql/src/test/queries/clientpositive/lineage3.q
index 53fff0f..c24ff7d 100644
--- a/ql/src/test/queries/clientpositive/lineage3.q
+++ b/ql/src/test/queries/clientpositive/lineage3.q
@@ -1,5 +1,20 @@
 set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.LineageLogger;
 
+drop table if exists d1;
+create table d1(a int);
+
+from (select a.ctinyint x, b.cstring1 y
+from alltypesorc a join alltypesorc b on a.cint = b.cbigint) t
+insert into table d1 select x + length(y);
+
+drop table if exists d2;
+create table d2(b varchar(128));
+
+from (select a.ctinyint x, b.cstring1 y
+from alltypesorc a join alltypesorc b on a.cint = b.cbigint) t
+insert into table d1 select x where y is null
+insert into table d2 select y where x > 0;
+
 drop table if exists t;
 create table t as
 select * from

http://git-wip-us.apache.org/repos/asf/hive/blob/1a75644d/ql/src/test/results/clientpositive/lineage3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/lineage3.q.out b/ql/src/test/results/clientpositive/lineage3.q.out
index 75d88f8..b6b4e0b 100644
--- a/ql/src/test/results/clientpositive/lineage3.q.out
+++ b/ql/src/test/results/clientpositive/lineage3.q.out
@@ -1,3 +1,31 @@
+PREHOOK: query: drop table if exists d1
+PREHOOK: type: DROPTABLE
+PREHOOK: query: create table d1(a int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@d1
+PREHOOK: query: from (select a.ctinyint x, b.cstring1 y
+from alltypesorc a join alltypesorc b on a.cint = b.cbigint) t
+insert into table d1 select x + length(y)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@d1
+{"version":"1.0","engine":"mr","hash":"4c9b7b8d89403cef78668f15d393e542","queryText":"from (select a.ctinyint x, b.cstring1 y\nfrom alltypesorc a join alltypesorc b on a.cint = b.cbigint) t\ninsert into table d1 select x + length(y)","edges":[{"sources":[1,2],"targets":[0],"expression":"(UDFToInteger(a.ctinyint) + length(a.cstring1))","edgeType":"PROJECTION"},{"sources":[3,4],"targets":[0],"expression":"(UDFToLong(a.cint) = a.cbigint)","edgeType":"PREDICATE"}],"vertices":[{"id":0,"vertexType":"COLUMN","vertexId":"default.d1.a"},{"id":1,"vertexType":"COLUMN","vertexId":"default.alltypesorc.ctinyint"},{"id":2,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cstring1"},{"id":3,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cint"},{"id":4,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cbigint"}]}
+PREHOOK: query: drop table if exists d2
+PREHOOK: type: DROPTABLE
+PREHOOK: query: create table d2(b varchar(128))
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@d2
+PREHOOK: query: from (select a.ctinyint x, b.cstring1 y
+from alltypesorc a join alltypesorc b on a.cint = b.cbigint) t
+insert into table d1 select x where y is null
+insert into table d2 select y where x > 0
+PREHOOK: type: QUERY
+PREHOOK: Input: default@alltypesorc
+PREHOOK: Output: default@d1
+PREHOOK: Output: default@d2
+{"version":"1.0","engine":"mr","hash":"8703e4091ebd4c96afd3cac83e3a2957","queryText":"from (select a.ctinyint x, b.cstring1 y\nfrom alltypesorc a join alltypesorc b on a.cint = b.cbigint) t\ninsert into table d1 select x where y is null\ninsert into table d2 select y where x > 0","edges":[{"sources":[2],"targets":[0],"expression":"UDFToInteger(x)","edgeType":"PROJECTION"},{"sources":[3],"targets":[0,1],"expression":"t.y is null","edgeType":"PREDICATE"},{"sources":[4,5],"targets":[0,1],"expression":"(UDFToLong(a.cint) = b.cbigint)","edgeType":"PREDICATE"},{"sources":[3],"targets":[1],"expression":"CAST( y AS varchar(128))","edgeType":"PROJECTION"},{"sources":[2],"targets":[0,1],"expression":"(t.x > 0)","edgeType":"PREDICATE"}],"vertices":[{"id":0,"vertexType":"COLUMN","vertexId":"default.d1.a"},{"id":1,"vertexType":"COLUMN","vertexId":"default.d2.b"},{"id":2,"vertexType":"COLUMN","vertexId":"default.alltypesorc.ctinyint"},{"id":3,"vertexType":"COLUMN","vertexId":"default.alltypesorc.
 cstring1"},{"id":4,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cint"},{"id":5,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cbigint"}]}
 PREHOOK: query: drop table if exists t
 PREHOOK: type: DROPTABLE
 PREHOOK: query: create table t as
@@ -23,7 +51,7 @@ where cint is not null and cint < 0 order by cint, cs limit 5
 PREHOOK: type: QUERY
 PREHOOK: Input: default@alltypesorc
 PREHOOK: Output: default@dest_l1@ds=today
-{"version":"1.0","engine":"mr","hash":"2b5891d094ff74e23ec6acf5b4990f45","queryText":"insert into table dest_l1 partition (ds='today')\nselect cint, cast(cstring1 as varchar(128)) as cs\nfrom alltypesorc\nwhere cint is not null and cint < 0 order by cint, cs limit 5","edges":[{"sources":[2],"targets":[0],"edgeType":"PROJECTION"},{"sources":[3],"targets":[1],"expression":"CAST( alltypesorc.cstring1 AS varchar(128))","edgeType":"PROJECTION"},{"sources":[2],"targets":[0,1],"expression":"(alltypesorc.cint is not null and (alltypesorc.cint < 0))","edgeType":"PREDICATE"}],"vertices":[{"id":0,"vertexType":"COLUMN","vertexId":"cint"},{"id":1,"vertexType":"COLUMN","vertexId":"cs"},{"id":2,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cint"},{"id":3,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cstring1"}]}
+{"version":"1.0","engine":"mr","hash":"2b5891d094ff74e23ec6acf5b4990f45","queryText":"insert into table dest_l1 partition (ds='today')\nselect cint, cast(cstring1 as varchar(128)) as cs\nfrom alltypesorc\nwhere cint is not null and cint < 0 order by cint, cs limit 5","edges":[{"sources":[2],"targets":[0],"edgeType":"PROJECTION"},{"sources":[3],"targets":[1],"expression":"CAST( alltypesorc.cstring1 AS varchar(128))","edgeType":"PROJECTION"},{"sources":[2],"targets":[0,1],"expression":"(alltypesorc.cint is not null and (alltypesorc.cint < 0))","edgeType":"PREDICATE"}],"vertices":[{"id":0,"vertexType":"COLUMN","vertexId":"default.dest_l1.a"},{"id":1,"vertexType":"COLUMN","vertexId":"default.dest_l1.b"},{"id":2,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cint"},{"id":3,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cstring1"}]}
 PREHOOK: query: insert into table dest_l1 partition (ds='tomorrow')
 select min(cint), cast(min(cstring1) as varchar(128)) as cs
 from alltypesorc
@@ -33,7 +61,7 @@ having min(cbigint) > 10
 PREHOOK: type: QUERY
 PREHOOK: Input: default@alltypesorc
 PREHOOK: Output: default@dest_l1@ds=tomorrow
-{"version":"1.0","engine":"mr","hash":"4ad6338a8abfe3fe0342198fcbd1f11d","queryText":"insert into table dest_l1 partition (ds='tomorrow')\nselect min(cint), cast(min(cstring1) as varchar(128)) as cs\nfrom alltypesorc\nwhere cint is not null and cboolean1 = true\ngroup by csmallint\nhaving min(cbigint) > 10","edges":[{"sources":[2],"targets":[0],"expression":"min(default.alltypesorc.cint)","edgeType":"PROJECTION"},{"sources":[3],"targets":[1],"expression":"CAST( min(default.alltypesorc.cstring1) AS varchar(128))","edgeType":"PROJECTION"},{"sources":[2,4],"targets":[0,1],"expression":"(alltypesorc.cint is not null and (alltypesorc.cboolean1 = true))","edgeType":"PREDICATE"},{"sources":[5],"targets":[0,1],"expression":"(min(default.alltypesorc.cbigint) > 10)","edgeType":"PREDICATE"}],"vertices":[{"id":0,"vertexType":"COLUMN","vertexId":"c0"},{"id":1,"vertexType":"COLUMN","vertexId":"cs"},{"id":2,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cint"},{"id":3,"vertexType":"COLUMN",
 "vertexId":"default.alltypesorc.cstring1"},{"id":4,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cboolean1"},{"id":5,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cbigint"}]}
+{"version":"1.0","engine":"mr","hash":"4ad6338a8abfe3fe0342198fcbd1f11d","queryText":"insert into table dest_l1 partition (ds='tomorrow')\nselect min(cint), cast(min(cstring1) as varchar(128)) as cs\nfrom alltypesorc\nwhere cint is not null and cboolean1 = true\ngroup by csmallint\nhaving min(cbigint) > 10","edges":[{"sources":[2],"targets":[0],"expression":"min(default.alltypesorc.cint)","edgeType":"PROJECTION"},{"sources":[3],"targets":[1],"expression":"CAST( min(default.alltypesorc.cstring1) AS varchar(128))","edgeType":"PROJECTION"},{"sources":[2,4],"targets":[0,1],"expression":"(alltypesorc.cint is not null and (alltypesorc.cboolean1 = true))","edgeType":"PREDICATE"},{"sources":[5],"targets":[0,1],"expression":"(min(default.alltypesorc.cbigint) > 10)","edgeType":"PREDICATE"}],"vertices":[{"id":0,"vertexType":"COLUMN","vertexId":"default.dest_l1.a"},{"id":1,"vertexType":"COLUMN","vertexId":"default.dest_l1.b"},{"id":2,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cint"},
 {"id":3,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cstring1"},{"id":4,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cboolean1"},{"id":5,"vertexType":"COLUMN","vertexId":"default.alltypesorc.cbigint"}]}
 PREHOOK: query: select cint, rank() over(order by cint) from alltypesorc
 where cint > 10 and cint < 10000 limit 10
 PREHOOK: type: QUERY