You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jx...@apache.org on 2015/07/02 20:26:21 UTC

[8/8] hive git commit: HIVE-11139: Emit more lineage information (Jimmy, reviewed by Szehon)

HIVE-11139: Emit more lineage information (Jimmy, reviewed by Szehon)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cdd1c7bf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cdd1c7bf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cdd1c7bf

Branch: refs/heads/master
Commit: cdd1c7bf775d788fc94eee6d33e8d630158195f1
Parents: 470d9c8
Author: Jimmy Xiang <jx...@cloudera.com>
Authored: Tue May 19 10:43:39 2015 -0700
Committer: Jimmy Xiang <jx...@cloudera.com>
Committed: Thu Jul 2 08:28:38 2015 -0700

----------------------------------------------------------------------
 pom.xml                                         |    1 +
 ql/pom.xml                                      |    5 +
 .../java/org/apache/hadoop/hive/ql/Driver.java  |    2 +-
 .../org/apache/hadoop/hive/ql/QueryPlan.java    |    9 +-
 .../hadoop/hive/ql/hooks/HookContext.java       |   12 +
 .../hadoop/hive/ql/hooks/LineageInfo.java       |   96 +
 .../hadoop/hive/ql/hooks/LineageLogger.java     |  441 +++
 .../ql/log/NoDeleteRollingFileAppender.java     |  176 ++
 .../ql/optimizer/lineage/ExprProcFactory.java   |   91 +
 .../hive/ql/optimizer/lineage/Generator.java    |   16 +-
 .../hive/ql/optimizer/lineage/LineageCtx.java   |   79 +-
 .../ql/optimizer/lineage/OpProcFactory.java     |  228 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   51 +-
 .../apache/hadoop/hive/ql/plan/FilterDesc.java  |   14 +
 .../apache/hadoop/hive/ql/plan/JoinDesc.java    |    4 +
 .../hadoop/hive/ql/session/LineageState.java    |    9 +-
 .../parse/TestUpdateDeleteSemanticAnalyzer.java |    2 +-
 ql/src/test/queries/clientpositive/lineage2.q   |  116 +
 ql/src/test/queries/clientpositive/lineage3.q   |  162 +
 .../alter_partition_change_col.q.out            |    8 +-
 .../clientpositive/alter_table_cascade.q.out    |    8 +-
 .../test/results/clientpositive/combine2.q.out  |   16 +-
 .../clientpositive/groupby_sort_1_23.q.out      |   10 +-
 .../clientpositive/groupby_sort_skew_1_23.q.out |   10 +-
 .../clientpositive/index_auto_mult_tables.q.out |   12 +
 .../index_auto_mult_tables_compact.q.out        |    9 +
 .../clientpositive/index_auto_partitioned.q.out |    9 +
 .../clientpositive/index_auto_update.q.out      |    2 +
 .../results/clientpositive/index_bitmap.q.out   |   24 +
 .../index_bitmap_auto_partitioned.q.out         |   12 +
 .../clientpositive/index_bitmap_rc.q.out        |   24 +
 .../results/clientpositive/index_compact.q.out  |   18 +
 .../clientpositive/index_compact_2.q.out        |   18 +
 ql/src/test/results/clientpositive/join34.q.out |    2 +-
 ql/src/test/results/clientpositive/join35.q.out |    2 +-
 .../test/results/clientpositive/lineage1.q.out  |    4 +-
 .../test/results/clientpositive/lineage2.q.out  | 2905 ++++++++++++++++++
 .../test/results/clientpositive/lineage3.q.out  | 2473 +++++++++++++++
 .../clientpositive/load_dyn_part13.q.out        |    8 +-
 .../results/clientpositive/multiMapJoin1.q.out  |   10 +-
 .../results/clientpositive/multi_insert.q.out   |   32 +-
 ...i_insert_move_tasks_share_dependencies.q.out |   32 +-
 .../orc_dictionary_threshold.q.out              |    2 +-
 ql/src/test/results/clientpositive/ptf.q.out    |   28 +-
 .../spark/groupby_sort_1_23.q.out               |   10 +-
 .../spark/groupby_sort_skew_1_23.q.out          |   10 +-
 .../results/clientpositive/spark/join34.q.out   |    2 +-
 .../results/clientpositive/spark/join35.q.out   |    2 +-
 .../clientpositive/spark/load_dyn_part13.q.out  |    8 +-
 .../clientpositive/spark/multi_insert.q.out     |   32 +-
 ...i_insert_move_tasks_share_dependencies.q.out |   32 +-
 .../test/results/clientpositive/spark/ptf.q.out |   28 +-
 .../results/clientpositive/spark/union22.q.out  |    4 +-
 .../results/clientpositive/spark/union28.q.out  |    4 +-
 .../results/clientpositive/spark/union29.q.out  |    4 +-
 .../results/clientpositive/spark/union30.q.out  |    4 +-
 .../results/clientpositive/spark/union33.q.out  |    4 +-
 .../clientpositive/spark/union_date_trim.q.out  |    4 +-
 .../clientpositive/spark/union_remove_1.q.out   |    4 +-
 .../clientpositive/spark/union_remove_10.q.out  |    2 +-
 .../clientpositive/spark/union_remove_11.q.out  |    2 +-
 .../clientpositive/spark/union_remove_15.q.out  |    8 +-
 .../clientpositive/spark/union_remove_16.q.out  |    8 +-
 .../clientpositive/spark/union_remove_17.q.out  |    4 +-
 .../clientpositive/spark/union_remove_18.q.out  |   24 +-
 .../clientpositive/spark/union_remove_19.q.out  |   12 +-
 .../clientpositive/spark/union_remove_2.q.out   |    2 +-
 .../clientpositive/spark/union_remove_20.q.out  |    4 +-
 .../clientpositive/spark/union_remove_21.q.out  |    2 +-
 .../clientpositive/spark/union_remove_22.q.out  |   12 +-
 .../clientpositive/spark/union_remove_23.q.out  |    2 +-
 .../clientpositive/spark/union_remove_24.q.out  |    4 +-
 .../clientpositive/spark/union_remove_25.q.out  |   16 +-
 .../clientpositive/spark/union_remove_3.q.out   |    2 +-
 .../clientpositive/spark/union_remove_4.q.out   |    4 +-
 .../clientpositive/spark/union_remove_5.q.out   |    2 +-
 .../clientpositive/spark/union_remove_6.q.out   |    8 +-
 .../spark/union_remove_6_subq.q.out             |    8 +-
 .../clientpositive/spark/union_remove_7.q.out   |    4 +-
 .../clientpositive/spark/union_remove_8.q.out   |    2 +-
 .../clientpositive/spark/union_remove_9.q.out   |    2 +-
 .../clientpositive/spark/union_top_level.q.out  |    4 +-
 .../clientpositive/spark/vectorized_ptf.q.out   |   28 +-
 .../clientpositive/spark/windowing.q.out        |   40 +-
 .../temp_table_windowing_expressions.q.out      |    8 +-
 .../test/results/clientpositive/tez/ptf.q.out   |   28 +-
 .../clientpositive/tez/unionDistinct_1.q.out    |   20 +-
 .../clientpositive/tez/vectorized_ptf.q.out     |   28 +-
 .../test/results/clientpositive/union22.q.out   |    4 +-
 .../test/results/clientpositive/union28.q.out   |    4 +-
 .../test/results/clientpositive/union29.q.out   |    4 +-
 .../test/results/clientpositive/union30.q.out   |    4 +-
 .../test/results/clientpositive/union33.q.out   |    4 +-
 .../clientpositive/unionDistinct_1.q.out        |   20 +-
 .../clientpositive/union_date_trim.q.out        |    4 +-
 .../results/clientpositive/union_remove_1.q.out |    4 +-
 .../clientpositive/union_remove_10.q.out        |    2 +-
 .../clientpositive/union_remove_11.q.out        |    2 +-
 .../clientpositive/union_remove_15.q.out        |    8 +-
 .../clientpositive/union_remove_16.q.out        |    8 +-
 .../clientpositive/union_remove_17.q.out        |    4 +-
 .../clientpositive/union_remove_18.q.out        |   24 +-
 .../clientpositive/union_remove_19.q.out        |   12 +-
 .../results/clientpositive/union_remove_2.q.out |    2 +-
 .../clientpositive/union_remove_20.q.out        |    4 +-
 .../clientpositive/union_remove_21.q.out        |    2 +-
 .../clientpositive/union_remove_22.q.out        |   12 +-
 .../clientpositive/union_remove_23.q.out        |    2 +-
 .../clientpositive/union_remove_24.q.out        |    4 +-
 .../clientpositive/union_remove_25.q.out        |   16 +-
 .../results/clientpositive/union_remove_3.q.out |    2 +-
 .../results/clientpositive/union_remove_4.q.out |    4 +-
 .../results/clientpositive/union_remove_5.q.out |    2 +-
 .../results/clientpositive/union_remove_6.q.out |    8 +-
 .../clientpositive/union_remove_6_subq.q.out    |    8 +-
 .../results/clientpositive/union_remove_7.q.out |    4 +-
 .../results/clientpositive/union_remove_8.q.out |    2 +-
 .../results/clientpositive/union_remove_9.q.out |    2 +-
 .../clientpositive/union_top_level.q.out        |    4 +-
 .../results/clientpositive/vectorized_ptf.q.out |   28 +-
 .../test/results/clientpositive/windowing.q.out |   40 +-
 .../clientpositive/windowing_expressions.q.out  |    8 +-
 122 files changed, 7398 insertions(+), 498 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f84f3e9..f2cb761 100644
--- a/pom.xml
+++ b/pom.xml
@@ -174,6 +174,7 @@
     <felix.version>2.4.0</felix.version>
     <curator.version>2.6.0</curator.version>
     <jsr305.version>3.0.0</jsr305.version>
+    <gson.version>2.2.4</gson.version>
   </properties>
 
   <repositories>

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index b86b85d..6026c49 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -288,6 +288,11 @@
       <version>${javaewah.version}</version>
     </dependency>
     <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>${gson.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.iq80.snappy</groupId>
       <artifactId>snappy</artifactId>
       <version>${snappy.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 669e6be..e04165b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -446,7 +446,7 @@ public class Driver implements CommandProcessor {
       String operationName = ctx.getExplain() ?
         HiveOperation.EXPLAIN.getOperationName() : SessionState.get().getCommandType();
       plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
-        operationName);
+        operationName, getSchema(sem, conf));
 
       conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index a0d61f5..29a3939 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.Schema;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
@@ -89,6 +90,7 @@ public class QueryPlan implements Serializable {
   protected LineageInfo linfo;
   private TableAccessInfo tableAccessInfo;
   private ColumnAccessInfo columnAccessInfo;
+  private Schema resultSchema;
 
   private HashMap<String, String> idToTableNameMap;
 
@@ -111,7 +113,7 @@ public class QueryPlan implements Serializable {
   }
 
   public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
-      String operationName) {
+      String operationName, Schema resultSchema) {
     this.queryString = queryString;
 
     rootTasks = new ArrayList<Task<? extends Serializable>>();
@@ -133,6 +135,7 @@ public class QueryPlan implements Serializable {
     queryProperties = sem.getQueryProperties();
     queryStartTime = startTime;
     this.operationName = operationName;
+    this.resultSchema = resultSchema;
   }
 
   public String getQueryStr() {
@@ -683,6 +686,10 @@ public class QueryPlan implements Serializable {
     this.outputs = outputs;
   }
 
+  public Schema getResultSchema() {
+    return resultSchema;
+  }
+
   public HashMap<String, String> getIdToTableNameMap() {
     return idToTableNameMap;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
index 0c6a938..bed17e9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -47,6 +48,7 @@ public class HookContext {
   private Set<ReadEntity> inputs;
   private Set<WriteEntity> outputs;
   private LineageInfo linfo;
+  private Index depMap;
   private UserGroupInformation ugi;
   private HookType hookType;
   final private Map<String, ContentSummary> inputPathToContentSummary;
@@ -67,8 +69,10 @@ public class HookContext {
     outputs = queryPlan.getOutputs();
     ugi = Utils.getUGI();
     linfo= null;
+    depMap = null;
     if(SessionState.get() != null){
       linfo = SessionState.get().getLineageState().getLineageInfo();
+      depMap = SessionState.get().getLineageState().getIndex();
     }
     this.userName = userName;
     this.ipAddress = ipAddress;
@@ -127,6 +131,14 @@ public class HookContext {
     this.linfo = linfo;
   }
 
+  public Index getIndex() {
+    return depMap;
+  }
+
+  public void setIndex(Index depMap) {
+    this.depMap = depMap;
+  }
+
   public UserGroupInformation getUgi() {
     return ugi;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
index f98b38b..fe0841e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
@@ -21,10 +21,13 @@ package org.apache.hadoop.hive.ql.hooks;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.collections.SetUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -260,6 +263,25 @@ public class LineageInfo implements Serializable {
     public String toString() {
       return tabAlias + ":" + column;
     }
+
+    @Override
+    public int hashCode() {
+      return (column != null ? column.hashCode() : 7)
+        + (tabAlias != null ? tabAlias.hashCode() : 11);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof BaseColumnInfo)) {
+        return false;
+      }
+      BaseColumnInfo ci = (BaseColumnInfo) obj;
+      return (column == null ? ci.column == null : column.equals(ci.column))
+        && (tabAlias == null ? ci.tabAlias == null : tabAlias.equals(ci.tabAlias));
+    }
   }
 
   public static class TableAliasInfo implements Serializable {
@@ -311,6 +333,25 @@ public class LineageInfo implements Serializable {
     public String toString() {
       return table.getDbName() + "." + table.getTableName() + "(" + alias + ")";
     }
+
+    @Override
+    public int hashCode() {
+      return (alias != null ? alias.hashCode() : 7)
+        + (table != null ? table.hashCode() : 11);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof TableAliasInfo)) {
+        return false;
+      }
+      TableAliasInfo tabAlias = (TableAliasInfo) obj;
+      return StringUtils.equals(alias, tabAlias.alias)
+        && (table == null ? tabAlias.table == null : table.equals(tabAlias.table));
+    }
   }
 
   /**
@@ -387,6 +428,61 @@ public class LineageInfo implements Serializable {
   }
 
   /**
+   * This class tracks the predicate information for an operator.
+   */
+  public static class Predicate {
+
+    /**
+     * Expression string for the predicate.
+     */
+    private String expr;
+
+    /**
+     * The set of base columns that the predicate depends on.
+     */
+    private Set<BaseColumnInfo> baseCols = new LinkedHashSet<BaseColumnInfo>();
+
+    /**
+     * @return the expr
+     */
+    public String getExpr() {
+      return expr;
+    }
+
+    /**
+     * @param expr the expr to set
+     */
+    public void setExpr(String expr) {
+      this.expr = expr;
+    }
+
+    /**
+     * @return the baseCols
+     */
+    public Set<BaseColumnInfo> getBaseCols() {
+      return baseCols;
+    }
+
+    @Override
+    public int hashCode() {
+      return baseCols.hashCode() + (expr != null ? expr.hashCode() : 11);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof Predicate)) {
+        return false;
+      }
+      Predicate cond = (Predicate) obj;
+      return StringUtils.equals(cond.expr, expr)
+        && SetUtils.isEqualSet(cond.baseCols, baseCols);
+    }
+  }
+
+  /**
    * The map contains an index from the (datacontainer, columnname) to the
    * dependency vector for that tuple. This is used to generate the
    * dependency vectors during the walk of the operator tree.

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
new file mode 100644
index 0000000..fc32af7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
@@ -0,0 +1,441 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.collections.SetUtils;
+import org.apache.commons.io.output.StringBuilderWriter;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Predicate;
+import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.gson.stream.JsonWriter;
+
+/**
+ * Implementation of a post execute hook that logs lineage info to a log file.
+ */
+public class LineageLogger implements ExecuteWithHookContext {
+
+  private static final Log LOG = LogFactory.getLog(LineageLogger.class);
+
+  private static final HashSet<String> OPERATION_NAMES = new HashSet<String>();
+
+  static {
+    OPERATION_NAMES.add(HiveOperation.QUERY.getOperationName());
+    OPERATION_NAMES.add(HiveOperation.CREATETABLE_AS_SELECT.getOperationName());
+    OPERATION_NAMES.add(HiveOperation.ALTERVIEW_AS.getOperationName());
+    OPERATION_NAMES.add(HiveOperation.CREATEVIEW.getOperationName());
+  }
+
+  private static final String FORMAT_VERSION = "1.0";
+
+  final static class Edge {
+    public static enum Type {
+      PROJECTION, PREDICATE
+    }
+
+    private Set<Vertex> sources;
+    private Set<Vertex> targets;
+    private String expr;
+    private Type type;
+
+    Edge(Set<Vertex> sources, Set<Vertex> targets, String expr, Type type) {
+      this.sources = sources;
+      this.targets = targets;
+      this.expr = expr;
+      this.type = type;
+    }
+  }
+
+  final static class Vertex {
+    public static enum Type {
+      COLUMN, TABLE
+    }
+    private Type type;
+    private String label;
+    private int id;
+
+    Vertex(String label) {
+      this(label, Type.COLUMN);
+    }
+
+    Vertex(String label, Type type) {
+      this.label = label;
+      this.type = type;
+    }
+
+    @Override
+    public int hashCode() {
+      return label.hashCode() + type.hashCode() * 3;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof Vertex)) {
+        return false;
+      }
+      Vertex vertex = (Vertex) obj;
+      return label.equals(vertex.label) && type == vertex.type;
+    }
+  }
+
+  @Override
+  public void run(HookContext hookContext) {
+    assert(hookContext.getHookType() == HookType.POST_EXEC_HOOK);
+    QueryPlan plan = hookContext.getQueryPlan();
+    Index index = hookContext.getIndex();
+    SessionState ss = SessionState.get();
+    if (ss != null && index != null
+        && OPERATION_NAMES.contains(plan.getOperationName())) {
+      try {
+        StringBuilderWriter out = new StringBuilderWriter(1024);
+        JsonWriter writer = new JsonWriter(out);
+        writer.setIndent("  ");
+
+        out.append("POSTHOOK: LINEAGE: ");
+        String queryStr = plan.getQueryStr().trim();
+        writer.beginObject();
+        writer.name("version").value(FORMAT_VERSION);
+        HiveConf conf = ss.getConf();
+        boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST);
+        if (!testMode) {
+          // Don't emit user/timestamp info in test mode,
+          // so that the test golden output file is fixed.
+          long queryTime = plan.getQueryStartTime().longValue();
+          writer.name("user").value(hookContext.getUgi().getUserName());
+          writer.name("timestamp").value(queryTime/1000);
+          writer.name("jobIds");
+          writer.beginArray();
+          List<TaskRunner> tasks = hookContext.getCompleteTaskList();
+          if (tasks != null && !tasks.isEmpty()) {
+            for (TaskRunner task: tasks) {
+              String jobId = task.getTask().getJobID();
+              if (jobId != null) {
+                writer.value(jobId);
+              }
+            }
+          }
+          writer.endArray();
+        }
+        writer.name("engine").value(
+          HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE));
+        writer.name("hash").value(getQueryHash(queryStr));
+        writer.name("queryText").value(queryStr);
+
+        List<Edge> edges = getEdges(plan, index);
+        Set<Vertex> vertices = getVertices(edges);
+        writeEdges(writer, edges);
+        writeVertices(writer, vertices);
+        writer.endObject();
+        writer.close();
+
+        // Log the lineage info
+        String lineage = out.toString();
+        if (testMode) {
+          // Log to console
+          log(lineage);
+        } else {
+          // In none test mode, emit to a log file,
+          // which can be different from the normal hive.log.
+          // For example, using NoDeleteRollingFileAppender to
+          // log to some file with different rolling policy.
+          LOG.info(lineage);
+        }
+      } catch (Throwable t) {
+        // Don't fail the query just because of any lineage issue.
+        log("Failed to log lineage graph, query is not affected\n"
+          + org.apache.hadoop.util.StringUtils.stringifyException(t));
+      }
+    }
+  }
+
+  /**
+   * Log an error to console if available.
+   */
+  private void log(String error) {
+    LogHelper console = SessionState.getConsole();
+    if (console != null) {
+      console.printError(error);
+    }
+  }
+
+  /**
+   * Based on the final select operator, find out all the target columns.
+   * For each target column, find out its sources based on the dependency index.
+   */
+  private List<Edge> getEdges(QueryPlan plan, Index index) {
+    List<FieldSchema> fieldSchemas = plan.getResultSchema().getFieldSchemas();
+    int fields = fieldSchemas == null ? 0 : fieldSchemas.size();
+    SelectOperator finalSelOp = index.getFinalSelectOp();
+    List<Edge> edges = new ArrayList<Edge>();
+    if (finalSelOp != null && fields > 0) {
+      Map<ColumnInfo, Dependency> colMap = index.getDependencies(finalSelOp);
+      List<Dependency> dependencies = colMap != null ? Lists.newArrayList(colMap.values()) : null;
+      if (dependencies == null || dependencies.size() != fields) {
+        log("Result schema has " + fields
+          + " fields, but we don't get as many dependencies");
+      } else {
+        String destTableName = null;
+        List<String> colNames = null;
+        // Based on the plan outputs, find out the target table name and column names.
+        for (WriteEntity output : plan.getOutputs()) {
+          if (output.getType() == Entity.Type.TABLE) {
+            org.apache.hadoop.hive.ql.metadata.Table t = output.getTable();
+            destTableName = t.getDbName() + "." + t.getTableName();
+            List<FieldSchema> cols = t.getCols();
+            if (cols != null && !cols.isEmpty()) {
+              colNames = Utilities.getColumnNamesFromFieldSchema(cols);
+            }
+            break;
+          }
+        }
+
+        // Go through each target column, generate the lineage edges.
+        Set<Vertex> allTargets = new LinkedHashSet<Vertex>();
+        Map<String, Vertex> allSources = new LinkedHashMap<String, Vertex>();
+        for (int i = 0; i < fields; i++) {
+          Vertex target = new Vertex(
+            getTargetFieldName(i, destTableName, colNames, fieldSchemas));
+          allTargets.add(target);
+          Dependency dep = dependencies.get(i);
+          String expr = dep.getExpr();
+          Set<Vertex> sources = createSourceVertices(allSources, dep.getBaseCols());
+          Edge edge = findSimilarEdgeBySources(edges, sources, expr, Edge.Type.PROJECTION);
+          if (edge == null) {
+            Set<Vertex> targets = new LinkedHashSet<Vertex>();
+            targets.add(target);
+            edges.add(new Edge(sources, targets, expr, Edge.Type.PROJECTION));
+          } else {
+            edge.targets.add(target);
+          }
+        }
+        Set<Predicate> conds = index.getPredicates(finalSelOp);
+        if (conds != null && !conds.isEmpty()) {
+          for (Predicate cond: conds) {
+            String expr = cond.getExpr();
+            Set<Vertex> sources = createSourceVertices(allSources, cond.getBaseCols());
+            Edge edge = findSimilarEdgeByTargets(edges, allTargets, expr, Edge.Type.PREDICATE);
+            if (edge == null) {
+              edges.add(new Edge(sources, allTargets, expr, Edge.Type.PREDICATE));
+            } else {
+              edge.sources.addAll(sources);
+            }
+          }
+        }
+      }
+    }
+    return edges;
+  }
+
+  /**
+   * Convert a list of columns to a set of vertices.
+   * Use cached vertices if possible.
+   */
+  private Set<Vertex> createSourceVertices(
+      Map<String, Vertex> srcVertexCache, Collection<BaseColumnInfo> baseCols) {
+    Set<Vertex> sources = new LinkedHashSet<Vertex>();
+    if (baseCols != null && !baseCols.isEmpty()) {
+      for(BaseColumnInfo col: baseCols) {
+        Table table = col.getTabAlias().getTable();
+        if (table.isTemporary()) {
+          // Ignore temporary tables
+          continue;
+        }
+        Vertex.Type type = Vertex.Type.TABLE;
+        String tableName = table.getDbName() + "." + table.getTableName();
+        FieldSchema fieldSchema = col.getColumn();
+        String label = tableName;
+        if (fieldSchema != null) {
+          type = Vertex.Type.COLUMN;
+          label = tableName + "." + fieldSchema.getName();
+        }
+        sources.add(getOrCreateVertex(srcVertexCache, label, type));
+      }
+    }
+    return sources;
+  }
+
+  /**
+   * Find a vertex from a cache, or create one if not.
+   */
+  private Vertex getOrCreateVertex(
+      Map<String, Vertex> vertices, String label, Vertex.Type type) {
+    Vertex vertex = vertices.get(label);
+    if (vertex == null) {
+      vertex = new Vertex(label, type);
+      vertices.put(label, vertex);
+    }
+    return vertex;
+  }
+
+  /**
+   * Find an edge that has the same type, expression, and sources.
+   */
+  private Edge findSimilarEdgeBySources(
+      List<Edge> edges, Set<Vertex> sources, String expr, Edge.Type type) {
+    for (Edge edge: edges) {
+      if (edge.type == type && StringUtils.equals(edge.expr, expr)
+          && SetUtils.isEqualSet(edge.sources, sources)) {
+        return edge;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Find an edge that has the same type, expression, and targets.
+   */
+  private Edge findSimilarEdgeByTargets(
+      List<Edge> edges, Set<Vertex> targets, String expr, Edge.Type type) {
+    for (Edge edge: edges) {
+      if (edge.type == type && StringUtils.equals(edge.expr, expr)
+          && SetUtils.isEqualSet(edge.targets, targets)) {
+        return edge;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Generate normalized name for a given target column.
+   */
+  private String getTargetFieldName(int fieldIndex,
+      String destTableName, List<String> colNames, List<FieldSchema> fieldSchemas) {
+    String fieldName = fieldSchemas.get(fieldIndex).getName();
+    String[] parts = fieldName.split("\\.");
+    if (destTableName != null) {
+      String colName = parts[parts.length - 1];
+      if (colNames != null && !colNames.contains(colName)) {
+        colName = colNames.get(fieldIndex);
+      }
+      return destTableName + "." + colName;
+    }
+    if (parts.length == 2 && parts[0].startsWith("_u")) {
+      return parts[1];
+    }
+    return fieldName;
+  }
+
+  /**
+   * Get all the vertices of all edges. Targets at first,
+   * then sources. Assign id to each vertex.
+   */
+  private Set<Vertex> getVertices(List<Edge> edges) {
+    Set<Vertex> vertices = new LinkedHashSet<Vertex>();
+    for (Edge edge: edges) {
+      vertices.addAll(edge.targets);
+    }
+    for (Edge edge: edges) {
+      vertices.addAll(edge.sources);
+    }
+
+    // Assign ids to all vertices,
+    // targets at first, then sources.
+    int id = 0;
+    for (Vertex vertex: vertices) {
+      vertex.id = id++;
+    }
+    return vertices;
+  }
+
+  /**
+   * Write out an JSON array of edges.
+   */
+  private void writeEdges(JsonWriter writer, List<Edge> edges) throws IOException {
+    writer.name("edges");
+    writer.beginArray();
+    for (Edge edge: edges) {
+      writer.beginObject();
+      writer.name("sources");
+      writer.beginArray();
+      for (Vertex vertex: edge.sources) {
+        writer.value(vertex.id);
+      }
+      writer.endArray();
+      writer.name("targets");
+      writer.beginArray();
+      for (Vertex vertex: edge.targets) {
+        writer.value(vertex.id);
+      }
+      writer.endArray();
+      if (edge.expr != null) {
+        writer.name("expression").value(edge.expr);
+      }
+      writer.name("edgeType").value(edge.type.name());
+      writer.endObject();
+    }
+    writer.endArray();
+  }
+
+  /**
+   * Write out an JSON array of vertices.
+   */
+  private void writeVertices(JsonWriter writer, Set<Vertex> vertices) throws IOException {
+    writer.name("vertices");
+    writer.beginArray();
+    for (Vertex vertex: vertices) {
+      writer.beginObject();
+      writer.name("id").value(vertex.id);
+      writer.name("vertexType").value(vertex.type.name());
+      writer.name("vertexId").value(vertex.label);
+      writer.endObject();
+    }
+    writer.endArray();
+  }
+
+  /**
+   * Generate query string md5 hash.
+   */
+  private String getQueryHash(String queryStr) {
+    Hasher hasher = Hashing.md5().newHasher();
+    hasher.putString(queryStr);
+    return hasher.hash().toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/log/NoDeleteRollingFileAppender.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/NoDeleteRollingFileAppender.java b/ql/src/java/org/apache/hadoop/hive/ql/log/NoDeleteRollingFileAppender.java
new file mode 100644
index 0000000..be32f06
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/log/NoDeleteRollingFileAppender.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.log;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.Writer;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.helpers.CountingQuietWriter;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.helpers.OptionConverter;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class NoDeleteRollingFileAppender extends FileAppender {
+  /**
+   * The default maximum file size is 10MB.
+   */
+  protected long maxFileSize = 10 * 1024 * 1024;
+
+  private long nextRollover = 0;
+
+  /**
+   * The default constructor simply calls its {@link FileAppender#FileAppender
+   * parents constructor}.
+   */
+  public NoDeleteRollingFileAppender() {
+  }
+
+  /**
+   * Instantiate a RollingFileAppender and open the file designated by
+   * <code>filename</code>. The opened filename will become the output
+   * destination for this appender.
+   * <p>
+   * If the <code>append</code> parameter is true, the file will be appended to.
+   * Otherwise, the file designated by <code>filename</code> will be truncated
+   * before being opened.
+   */
+  public NoDeleteRollingFileAppender(Layout layout, String filename,
+      boolean append) throws IOException {
+    super(layout, filename, append);
+  }
+
+  /**
+   * Instantiate a FileAppender and open the file designated by
+   * <code>filename</code>. The opened filename will become the output
+   * destination for this appender.
+   * <p>
+   * The file will be appended to.
+   */
+  public NoDeleteRollingFileAppender(Layout layout, String filename)
+      throws IOException {
+    super(layout, filename);
+  }
+
+  /**
+   * Get the maximum size that the output file is allowed to reach before being
+   * rolled over to backup files.
+   */
+  public long getMaximumFileSize() {
+    return maxFileSize;
+  }
+
+  /**
+   * Implements the usual roll over behavior.
+   * <p>
+   * <code>File</code> is renamed <code>File.yyyyMMddHHmmss</code> and closed. A
+   * new <code>File</code> is created to receive further log output.
+   */
+  // synchronization not necessary since doAppend is already synced
+  public void rollOver() {
+    if (qw != null) {
+      long size = ((CountingQuietWriter) qw).getCount();
+      LogLog.debug("rolling over count=" + size);
+      // if operation fails, do not roll again until
+      // maxFileSize more bytes are written
+      nextRollover = size + maxFileSize;
+    }
+
+    this.closeFile(); // keep windows happy.
+
+    int p = fileName.lastIndexOf(".");
+    String file = p > 0 ? fileName.substring(0, p) : fileName;
+    try {
+      // This will also close the file. This is OK since multiple
+      // close operations are safe.
+      this.setFile(file, false, bufferedIO, bufferSize);
+      nextRollover = 0;
+    } catch (IOException e) {
+      if (e instanceof InterruptedIOException) {
+        Thread.currentThread().interrupt();
+      }
+      LogLog.error("setFile(" + file + ", false) call failed.", e);
+    }
+  }
+
+  public synchronized void setFile(String fileName, boolean append,
+      boolean bufferedIO, int bufferSize) throws IOException {
+    String newFileName = getLogFileName(fileName);
+    super.setFile(newFileName, append, this.bufferedIO, this.bufferSize);
+    if (append) {
+      File f = new File(newFileName);
+      ((CountingQuietWriter) qw).setCount(f.length());
+    }
+  }
+
+  /**
+   * Set the maximum size that the output file is allowed to reach before being
+   * rolled over to backup files.
+   * <p>
+   * This method is equivalent to {@link #setMaxFileSize} except that it is
+   * required for differentiating the setter taking a <code>long</code> argument
+   * from the setter taking a <code>String</code> argument by the JavaBeans
+   * {@link java.beans.Introspector Introspector}.
+   *
+   * @see #setMaxFileSize(String)
+   */
+  public void setMaximumFileSize(long maxFileSize) {
+    this.maxFileSize = maxFileSize;
+  }
+
+  /**
+   * Set the maximum size that the output file is allowed to reach before being
+   * rolled over to backup files.
+   * <p>
+   * In configuration files, the <b>MaxFileSize</b> option takes an long integer
+   * in the range 0 - 2^63. You can specify the value with the suffixes "KB",
+   * "MB" or "GB" so that the integer is interpreted being expressed
+   * respectively in kilobytes, megabytes or gigabytes. For example, the value
+   * "10KB" will be interpreted as 10240.
+   */
+  public void setMaxFileSize(String value) {
+    maxFileSize = OptionConverter.toFileSize(value, maxFileSize + 1);
+  }
+
+  protected void setQWForFiles(Writer writer) {
+    this.qw = new CountingQuietWriter(writer, errorHandler);
+  }
+
+  /**
+   * This method differentiates RollingFileAppender from its super class.
+   */
+  protected void subAppend(LoggingEvent event) {
+    super.subAppend(event);
+
+    if (fileName != null && qw != null) {
+      long size = ((CountingQuietWriter) qw).getCount();
+      if (size >= maxFileSize && size >= nextRollover) {
+        rollOver();
+      }
+    }
+  }
+
+  // Mangled file name. Append the current timestamp
+  private static String getLogFileName(String oldFileName) {
+    return oldFileName + "." + Long.toString(System.currentTimeMillis());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
index c930b80..46fe5db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
@@ -26,14 +26,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.DependencyType;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Predicate;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.TableAliasInfo;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -43,6 +48,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -162,6 +168,91 @@ public class ExprProcFactory {
     return new ColumnExprProcessor();
   }
 
+  private static boolean findSourceColumn(
+      LineageCtx lctx, Predicate cond, String tabAlias, String alias) {
+    for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpMap: lctx
+        .getParseCtx().getTopOps().entrySet()) {
+      Operator<? extends OperatorDesc> topOp = topOpMap.getValue();
+      if (topOp instanceof TableScanOperator) {
+        TableScanOperator tableScanOp = (TableScanOperator) topOp;
+        Table tbl = tableScanOp.getConf().getTableMetadata();
+        if (tbl.getTableName().equals(tabAlias)
+            || tabAlias.equals(tableScanOp.getConf().getAlias())) {
+          for (FieldSchema column: tbl.getCols()) {
+            if (column.getName().equals(alias)) {
+              TableAliasInfo table = new TableAliasInfo();
+              table.setTable(tbl.getTTable());
+              table.setAlias(tabAlias);
+              BaseColumnInfo colInfo = new BaseColumnInfo();
+              colInfo.setColumn(column);
+              colInfo.setTabAlias(table);
+              cond.getBaseCols().add(colInfo);
+              return true;
+            }
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get the expression string of an expression node.
+   */
+  public static String getExprString(RowSchema rs, ExprNodeDesc expr,
+      LineageCtx lctx, Operator<? extends OperatorDesc> inpOp, Predicate cond) {
+    if (expr instanceof ExprNodeColumnDesc) {
+      ExprNodeColumnDesc col = (ExprNodeColumnDesc) expr;
+      String columnName = col.getColumn();
+      ColumnInfo ci = rs.getColumnInfo(columnName);
+      String alias = ci != null ? ci.getAlias() : columnName;
+      String internalName = ci != null ? ci.getInternalName() : columnName;
+      Dependency dep = lctx.getIndex().getDependency(inpOp, internalName);
+      String tabAlias = ci != null ? ci.getTabAlias() : col.getTabAlias();
+      if ((tabAlias == null || tabAlias.startsWith("_") || tabAlias.startsWith("$"))
+          && (dep != null && dep.getType() == DependencyType.SIMPLE)) {
+        List<BaseColumnInfo> baseCols = dep.getBaseCols();
+        if (baseCols != null && !baseCols.isEmpty()) {
+          BaseColumnInfo baseCol = baseCols.get(0);
+          tabAlias = baseCol.getTabAlias().getAlias();
+          alias = baseCol.getColumn().getName();
+        }
+      }
+      if (tabAlias != null && tabAlias.length() > 0
+          && !tabAlias.startsWith("_") && !tabAlias.startsWith("$")) {
+        if (cond != null && !findSourceColumn(lctx, cond, tabAlias, alias) && dep != null) {
+          cond.getBaseCols().addAll(dep.getBaseCols());
+        }
+        return tabAlias + "." + alias;
+      }
+
+      if (dep != null) {
+        if (cond != null) {
+          cond.getBaseCols().addAll(dep.getBaseCols());
+        }
+        if (dep.getExpr() != null) {
+          return dep.getExpr();
+        }
+      }
+      if (alias.startsWith("_")) {
+        ci = inpOp.getSchema().getColumnInfo(columnName);
+        if (ci != null) {
+          alias = ci.getAlias();
+        }
+      }
+      return alias;
+    } else if (expr instanceof ExprNodeGenericFuncDesc) {
+      ExprNodeGenericFuncDesc func = (ExprNodeGenericFuncDesc) expr;
+      List<ExprNodeDesc> children = func.getChildren();
+      String[] childrenExprStrings = new String[children.size()];
+      for (int i = 0; i < childrenExprStrings.length; i++) {
+        childrenExprStrings[i] = getExprString(rs, children.get(i), lctx, inpOp, cond);
+      }
+      return func.getGenericUDF().getDisplayString(childrenExprStrings);
+    }
+    return expr.getExprString();
+  }
+
   /**
    * Gets the expression dependencies for the expression.
    *

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
index 51bef04..9a5cf55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
@@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -59,8 +61,11 @@ public class Generator implements Transform {
   @Override
   public ParseContext transform(ParseContext pctx) throws SemanticException {
 
+    Index index = SessionState.get() != null ?
+      SessionState.get().getLineageState().getIndex() : new Index();
+
     // Create the lineage context
-    LineageCtx lCtx = new LineageCtx(pctx);
+    LineageCtx lCtx = new LineageCtx(pctx, index);
 
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
     opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%"),
@@ -83,7 +88,9 @@ public class Generator implements Transform {
     opRules.put(new RuleRegExp("R9", LateralViewJoinOperator.getOperatorName() + "%"),
       OpProcFactory.getLateralViewJoinProc());
     opRules.put(new RuleRegExp("R10", PTFOperator.getOperatorName() + "%"),
-        OpProcFactory.getTransformProc());
+      OpProcFactory.getTransformProc());
+    opRules.put(new RuleRegExp("R11", FilterOperator.getOperatorName() + "%"),
+      OpProcFactory.getFilterProc());
 
     // The dispatcher fires the processor corresponding to the closest matching rule and passes the context along
     Dispatcher disp = new DefaultRuleDispatcher(OpProcFactory.getDefaultProc(), opRules, lCtx);
@@ -94,11 +101,6 @@ public class Generator implements Transform {
     topNodes.addAll(pctx.getTopOps().values());
     ogw.startWalking(topNodes, null);
 
-    // Transfer the index from the lineage context to the session state.
-    if (SessionState.get() != null) {
-      SessionState.get().getLineageState().setIndex(lCtx.getIndex());
-    }
-
     return pctx;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
index cef24e3..d26d8da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.optimizer.lineage;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
@@ -26,9 +27,11 @@ import java.util.Set;
 
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Predicate;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -44,11 +47,6 @@ public class LineageCtx implements NodeProcessorCtx {
   public static class Index {
 
     /**
-     * Serial Version UID.
-     */
-    private static final long serialVersionUID = 1L;
-
-    /**
      * The map contains an index from the (operator, columnInfo) to the
      * dependency vector for that tuple. This is used to generate the
      * dependency vectors during the walk of the operator tree.
@@ -57,12 +55,20 @@ public class LineageCtx implements NodeProcessorCtx {
                       LinkedHashMap<ColumnInfo, Dependency>> depMap;
 
     /**
+     * A map from operator to the conditions strings.
+     */
+    private final Map<Operator<? extends OperatorDesc>, Set<Predicate>> condMap;
+
+    private SelectOperator finalSelectOp;
+
+    /**
      * Constructor.
      */
     public Index() {
       depMap =
         new LinkedHashMap<Operator<? extends OperatorDesc>,
                           LinkedHashMap<ColumnInfo, Dependency>>();
+      condMap = new HashMap<Operator<? extends OperatorDesc>, Set<Predicate>>();
     }
 
     /**
@@ -83,6 +89,28 @@ public class LineageCtx implements NodeProcessorCtx {
     }
 
     /**
+     * Gets the dependency for a tuple of an operator,
+     * and a ColumnInfo with specified internal name.
+     *
+     * @param op The operator whose dependency is being inspected.
+     * @param internalName The internal name of the column info
+     * @return Dependency for that particular operator, ColumnInfo tuple.
+     *         null if no dependency is found.
+     */
+    public Dependency getDependency(
+        Operator<? extends OperatorDesc> op, String internalName) {
+      Map<ColumnInfo, Dependency> colMap = depMap.get(op);
+      if (colMap != null) {
+        for (Map.Entry<ColumnInfo, Dependency> e: colMap.entrySet()) {
+          if (e.getKey().getInternalName().equals(internalName)) {
+            return e.getValue();
+          }
+        }
+      }
+      return null;
+    }
+
+    /**
      * Puts the dependency for an operator, columninfo tuple.
      * @param op The operator whose dependency is being inserted.
      * @param col The column info whose dependency is being inserted.
@@ -124,7 +152,43 @@ public class LineageCtx implements NodeProcessorCtx {
       }
     }
 
+    public Map<ColumnInfo, Dependency> getDependencies(Operator<? extends OperatorDesc> op) {
+      return depMap.get(op);
+    }
+
+    public void addPredicate(Operator<? extends OperatorDesc> op, Predicate cond) {
+      Set<Predicate> conds = condMap.get(op);
+      if (conds == null) {
+        conds = new LinkedHashSet<Predicate>();
+        condMap.put(op, conds);
+      }
+      conds.add(cond);
+    }
+
+    public void copyPredicates(Operator<? extends OperatorDesc> srcOp,
+        Operator<? extends OperatorDesc> tgtOp) {
+      Set<Predicate> conds = getPredicates(srcOp);
+      if (conds != null) {
+        for (Predicate cond: conds) {
+          addPredicate(tgtOp, cond);
+        }
+      }
+    }
+
+    public Set<Predicate> getPredicates(Operator<? extends OperatorDesc> op) {
+      return condMap.get(op);
+    }
+
+    public void setFinalSelectOp(SelectOperator sop) {
+      finalSelectOp = sop;
+    }
+
+    public SelectOperator getFinalSelectOp() {
+      return finalSelectOp;
+    }
+
     public void clear() {
+      finalSelectOp = null;
       depMap.clear();
     }
   }
@@ -145,9 +209,10 @@ public class LineageCtx implements NodeProcessorCtx {
    * Constructor.
    *
    * @param pctx The parse context that is used to get table metadata information.
+   * @param index The dependency map.
    */
-  public LineageCtx(ParseContext pctx) {
-    index = new Index();
+  public LineageCtx(ParseContext pctx, Index index) {
+    this.index = index;
     this.pctx = pctx;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
index 5957ac0..f670db8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
@@ -27,18 +27,20 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.ForwardOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hive.ql.hooks.LineageInfo;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.DependencyType;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Predicate;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.TableAliasInfo;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -56,6 +59,8 @@ import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -72,6 +77,7 @@ public class OpProcFactory {
    *
    * @return Operator The parent operator in the current path.
    */
+  @SuppressWarnings("unchecked")
   protected static Operator<? extends OperatorDesc> getParent(Stack<Node> stack) {
     return (Operator<? extends OperatorDesc>)Utils.getNthAncestor(stack, 1);
   }
@@ -89,8 +95,10 @@ public class OpProcFactory {
       LineageCtx lCtx = (LineageCtx) procCtx;
 
       // The operators
+      @SuppressWarnings("unchecked")
       Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>)nd;
       Operator<? extends OperatorDesc> inpOp = getParent(stack);
+      lCtx.getIndex().copyPredicates(inpOp, op);
 
       // Create a single dependency list by concatenating the dependencies of all
       // the cols
@@ -105,16 +113,27 @@ public class OpProcFactory {
         Dependency d = lCtx.getIndex().getDependency(inpOp, ci);
         if (d != null) {
           new_type = LineageCtx.getNewDependencyType(d.getType(), new_type);
-          col_set.addAll(d.getBaseCols());
+          if (!ci.isHiddenVirtualCol()) {
+            col_set.addAll(d.getBaseCols());
+          }
         }
       }
 
       dep.setType(new_type);
       dep.setBaseCols(new ArrayList<BaseColumnInfo>(col_set));
 
+      boolean isScript = op instanceof ScriptOperator;
+
       // This dependency is then set for all the colinfos of the script operator
       for(ColumnInfo ci : op.getSchema().getSignature()) {
-        lCtx.getIndex().putDependency(op, ci, dep);
+        Dependency d = dep;
+        if (!isScript) {
+          Dependency dep_ci = lCtx.getIndex().getDependency(inpOp, ci);
+          if (dep_ci != null) {
+            d = dep_ci;
+          }
+        }
+        lCtx.getIndex().putDependency(op, ci, d);
       }
 
       return null;
@@ -167,8 +186,6 @@ public class OpProcFactory {
 
         // Populate the dependency
         dep.setType(LineageInfo.DependencyType.SIMPLE);
-        // TODO: Find out how to get the expression here.
-        dep.setExpr(null);
         dep.setBaseCols(new ArrayList<BaseColumnInfo>());
         dep.getBaseCols().add(bci);
 
@@ -189,7 +206,7 @@ public class OpProcFactory {
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
 
-      // Assert that there is atleast one item in the stack. This should never
+      // Assert that there is at least one item in the stack. This should never
       // be called for leafs.
       assert(!stack.isEmpty());
 
@@ -200,6 +217,12 @@ public class OpProcFactory {
 
       // The input operator to the join is always a reduce sink operator
       ReduceSinkOperator inpOp = (ReduceSinkOperator)getParent(stack);
+      lCtx.getIndex().copyPredicates(inpOp, op);
+      Predicate cond = getPredicate(op, lCtx);
+      if (cond != null) {
+        lCtx.getIndex().addPredicate(op, cond);
+      }
+
       ReduceSinkDesc rd = inpOp.getConf();
       int tag = rd.getTag();
 
@@ -221,6 +244,59 @@ public class OpProcFactory {
       return null;
     }
 
+    private Predicate getPredicate(JoinOperator jop, LineageCtx lctx) {
+      List<Operator<? extends OperatorDesc>> parentOperators = jop.getParentOperators();
+      JoinDesc jd = jop.getConf();
+      ExprNodeDesc [][] joinKeys = jd.getJoinKeys();
+      if (joinKeys == null || parentOperators == null || parentOperators.size() < 2) {
+        return null;
+      }
+      LineageCtx.Index index = lctx.getIndex();
+      for (Operator<? extends OperatorDesc> op: parentOperators) {
+        if (index.getDependencies(op) == null) {
+          return null;
+        }
+      }
+      Predicate cond = new Predicate();
+      JoinCondDesc[] conds = jd.getConds();
+      int parents = parentOperators.size();
+      StringBuilder sb = new StringBuilder("(");
+      for (int i = 0; i < conds.length; i++) {
+        if (i != 0) {
+          sb.append(" AND ");
+        }
+        int left = conds[i].getLeft();
+        int right = conds[i].getRight();
+        if (joinKeys.length < left
+            || joinKeys[left].length == 0
+            || joinKeys.length < right
+            || joinKeys[right].length == 0
+            || parents < left
+            || parents < right) {
+          return null;
+        }
+        ExprNodeDesc expr = joinKeys[left][0];
+        Operator<? extends OperatorDesc> op = parentOperators.get(left);
+        List<Operator<? extends OperatorDesc>> p = op.getParentOperators();
+        if (p == null || p.isEmpty()) {
+          return null;
+        }
+        sb.append(ExprProcFactory.getExprString(op.getSchema(),
+          expr, lctx, p.get(0), cond));
+        sb.append(" = ");
+        expr = joinKeys[right][0];
+        op = parentOperators.get(right);
+        p = op.getParentOperators();
+        if (p == null || p.isEmpty()) {
+          return null;
+        }
+        sb.append(ExprProcFactory.getExprString(op.getSchema(),
+          expr, lctx, p.get(0), cond));
+      }
+      sb.append(")");
+      cond.setExpr(sb.toString());
+      return cond;
+    }
   }
 
   /**
@@ -231,7 +307,7 @@ public class OpProcFactory {
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
 
-      // Assert that there is atleast one item in the stack. This should never
+      // Assert that there is at least one item in the stack. This should never
       // be called for leafs.
       assert(!stack.isEmpty());
 
@@ -241,6 +317,7 @@ public class OpProcFactory {
       boolean isUdtfPath = true;
       Operator<? extends OperatorDesc> inpOp = getParent(stack);
       ArrayList<ColumnInfo> cols = inpOp.getSchema().getSignature();
+      lCtx.getIndex().copyPredicates(inpOp, op);
 
       if (inpOp instanceof SelectOperator) {
         isUdtfPath = false;
@@ -295,11 +372,31 @@ public class OpProcFactory {
       // Otherwise we treat this as a normal select operator and look at
       // the expressions.
 
-      ArrayList<ColumnInfo> col_infos = sop.getSchema().getSignature();
+      Operator<? extends OperatorDesc> inpOp = getParent(stack);
+      lctx.getIndex().copyPredicates(inpOp, sop);
+
+      RowSchema rs = sop.getSchema();
+      ArrayList<ColumnInfo> col_infos = rs.getSignature();
       int cnt = 0;
       for(ExprNodeDesc expr : sop.getConf().getColList()) {
-        lctx.getIndex().putDependency(sop, col_infos.get(cnt++),
-            ExprProcFactory.getExprDependency(lctx, getParent(stack), expr));
+        Dependency dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr);
+        if (dep != null && dep.getExpr() == null && (dep.getBaseCols().isEmpty()
+            || dep.getType() != LineageInfo.DependencyType.SIMPLE)) {
+          dep.setExpr(ExprProcFactory.getExprString(rs, expr, lctx, inpOp, null));
+        }
+        lctx.getIndex().putDependency(sop, col_infos.get(cnt++), dep);
+      }
+
+      Operator<? extends OperatorDesc> op = null;
+      if (!sop.getChildOperators().isEmpty()) {
+        op = sop.getChildOperators().get(0);
+        if (!op.getChildOperators().isEmpty() && op instanceof LimitOperator) {
+          op = op.getChildOperators().get(0);
+        }
+      }
+      if (op == null || (op.getChildOperators().isEmpty()
+          && op instanceof FileSinkOperator)) {
+        lctx.getIndex().setFinalSelectOp(sop);
       }
 
       return null;
@@ -319,6 +416,7 @@ public class OpProcFactory {
       GroupByOperator gop = (GroupByOperator)nd;
       ArrayList<ColumnInfo> col_infos = gop.getSchema().getSignature();
       Operator<? extends OperatorDesc> inpOp = getParent(stack);
+      lctx.getIndex().copyPredicates(inpOp, gop);
       int cnt = 0;
 
       for(ExprNodeDesc expr : gop.getConf().getKeys()) {
@@ -326,21 +424,64 @@ public class OpProcFactory {
             ExprProcFactory.getExprDependency(lctx, inpOp, expr));
       }
 
+      // If this is a reduce side GroupBy operator, check if there is
+      // a corresponding map side one. If so, some expression could have
+      // already been resolved in the map side.
+      boolean reduceSideGop = (inpOp instanceof ReduceSinkOperator)
+        && (Utils.getNthAncestor(stack, 2) instanceof GroupByOperator);
+
+      RowSchema rs = gop.getSchema();
       for(AggregationDesc agg : gop.getConf().getAggregators()) {
         // Concatenate the dependencies of all the parameters to
         // create the new dependency
         Dependency dep = new Dependency();
         DependencyType new_type = LineageInfo.DependencyType.EXPRESSION;
-        // TODO: Get the actual string here.
-        dep.setExpr(null);
+        StringBuilder sb = new StringBuilder();
+        boolean first = true;
         LinkedHashSet<BaseColumnInfo> bci_set = new LinkedHashSet<BaseColumnInfo>();
         for(ExprNodeDesc expr : agg.getParameters()) {
+          if (first) {
+            first = false;
+          } else {
+            sb.append(", ");
+          }
           Dependency expr_dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr);
-          if (expr_dep != null) {
+          if (expr_dep != null && !expr_dep.getBaseCols().isEmpty()) {
             new_type = LineageCtx.getNewDependencyType(expr_dep.getType(), new_type);
             bci_set.addAll(expr_dep.getBaseCols());
+            if (expr_dep.getType() == LineageInfo.DependencyType.SIMPLE) {
+              BaseColumnInfo col = expr_dep.getBaseCols().get(0);
+              Table t = col.getTabAlias().getTable();
+              if (t != null) {
+                sb.append(t.getDbName()).append(".").append(t.getTableName()).append(".");
+              }
+              sb.append(col.getColumn().getName());
+            }
+          }
+          if (expr_dep == null || expr_dep.getBaseCols().isEmpty()
+              || expr_dep.getType() != LineageInfo.DependencyType.SIMPLE) {
+            sb.append(expr_dep != null && expr_dep.getExpr() != null ? expr_dep.getExpr() :
+              ExprProcFactory.getExprString(rs, expr, lctx, inpOp, null));
           }
         }
+        String expr = sb.toString();
+        String udafName = agg.getGenericUDAFName();
+        if (!(reduceSideGop && expr.startsWith(udafName))) {
+          sb.setLength(0); // reset the buffer
+          sb.append(udafName);
+          sb.append("(");
+          if (agg.getDistinct()) {
+            sb.append("DISTINCT ");
+          }
+          sb.append(expr);
+          if (first) {
+            // No parameter, count(*)
+            sb.append("*");
+          }
+          sb.append(")");
+          expr = sb.toString();
+        }
+        dep.setExpr(expr);
 
         // If the bci_set is empty, this means that the inputs to this
         // aggregate function were all constants (e.g. count(1)). In this case
@@ -390,13 +531,11 @@ public class OpProcFactory {
    */
   public static class UnionLineage extends DefaultLineage implements NodeProcessor {
 
-    protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName());
-
     @SuppressWarnings("unchecked")
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      // Assert that there is atleast one item in the stack. This should never
+      // Assert that there is at least one item in the stack. This should never
       // be called for leafs.
       assert(!stack.isEmpty());
 
@@ -407,6 +546,7 @@ public class OpProcFactory {
       // Get the row schema of the input operator.
       // The row schema of the parent operator
       Operator<? extends OperatorDesc> inpOp = getParent(stack);
+      lCtx.getIndex().copyPredicates(inpOp, op);
       RowSchema rs = op.getSchema();
       ArrayList<ColumnInfo> inp_cols = inpOp.getSchema().getSignature();
       int cnt = 0;
@@ -425,13 +565,10 @@ public class OpProcFactory {
    */
   public static class ReduceSinkLineage implements NodeProcessor {
 
-    protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName());
-
-    @SuppressWarnings("unchecked")
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      // Assert that there is atleast one item in the stack. This should never
+      // Assert that there is at least one item in the stack. This should never
       // be called for leafs.
       assert(!stack.isEmpty());
 
@@ -440,6 +577,7 @@ public class OpProcFactory {
       ReduceSinkOperator rop = (ReduceSinkOperator)nd;
 
       Operator<? extends OperatorDesc> inpOp = getParent(stack);
+      lCtx.getIndex().copyPredicates(inpOp, rop);
       int cnt = 0;
 
       // The keys are included only in case the reduce sink feeds into
@@ -492,18 +630,56 @@ public class OpProcFactory {
   }
 
   /**
+   * Filter processor.
+   */
+  public static class FilterLineage implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      // Assert that there is at least one item in the stack. This should never
+      // be called for leafs.
+      assert(!stack.isEmpty());
+
+      // LineageCtx
+      LineageCtx lCtx = (LineageCtx) procCtx;
+      FilterOperator fop = (FilterOperator)nd;
+
+      // Get the row schema of the input operator.
+      // The row schema of the parent operator
+      Operator<? extends OperatorDesc> inpOp = getParent(stack);
+      lCtx.getIndex().copyPredicates(inpOp, fop);
+      FilterDesc filterDesc = fop.getConf();
+      RowSchema rs = fop.getSchema();
+      if (!filterDesc.isGenerated()) {
+        Predicate cond = new Predicate();
+        cond.setExpr(ExprProcFactory.getExprString(
+          rs, filterDesc.getPredicate(), lCtx, inpOp, cond));
+        lCtx.getIndex().addPredicate(fop, cond);
+      }
+
+      ArrayList<ColumnInfo> inp_cols = inpOp.getSchema().getSignature();
+      int cnt = 0;
+      for(ColumnInfo ci : rs.getSignature()) {
+        lCtx.getIndex().putDependency(fop, ci,
+            lCtx.getIndex().getDependency(inpOp, inp_cols.get(cnt++)));
+      }
+
+      return null;
+    }
+  }
+
+  /**
    * Default processor. This basically passes the input dependencies as such
    * to the output dependencies.
    */
   public static class DefaultLineage implements NodeProcessor {
 
-    protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName());
-
     @SuppressWarnings("unchecked")
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      // Assert that there is atleast one item in the stack. This should never
+      // Assert that there is at least one item in the stack. This should never
       // be called for leafs.
       assert(!stack.isEmpty());
 
@@ -514,6 +690,7 @@ public class OpProcFactory {
       // Get the row schema of the input operator.
       // The row schema of the parent operator
       Operator<? extends OperatorDesc> inpOp = getParent(stack);
+      lCtx.getIndex().copyPredicates(inpOp, op);
       RowSchema rs = op.getSchema();
       ArrayList<ColumnInfo> inp_cols = inpOp.getSchema().getSignature();
       int cnt = 0;
@@ -561,4 +738,7 @@ public class OpProcFactory {
     return new DefaultLineage();
   }
 
+  public static NodeProcessor getFilterProc() {
+    return new FilterLineage();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index f41668b..b02374e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -114,8 +114,11 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.optimizer.Optimizer;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
 import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
 import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverterPostProc;
+import org.apache.hadoop.hive.ql.optimizer.lineage.Generator;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec.SpecType;
 import org.apache.hadoop.hive.ql.parse.CalcitePlanner.ASTSearcher;
@@ -206,7 +209,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.InputFormat;
@@ -2886,8 +2888,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       return input;
     }
 
-    Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
-        new FilterDesc(filterPred, false),
+    FilterDesc filterDesc = new FilterDesc(filterPred, false);
+    filterDesc.setGenerated(true);
+    Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(filterDesc,
         new RowSchema(inputRR.getColumnInfos()), input), inputRR);
 
     if (LOG.isDebugEnabled()) {
@@ -5394,6 +5397,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       OpParseContext inputCtx = opParseCtx.get(input);
       RowResolver inputRR = inputCtx.getRowResolver();
       FilterDesc orFilterDesc = new FilterDesc(previous, false);
+      orFilterDesc.setGenerated(true);
 
       selectInput = putOpInsertMap(OperatorFactory.getAndMakeChild(
           orFilterDesc, new RowSchema(
@@ -9459,9 +9463,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         LOG.info("No need for sample filter");
         ExprNodeDesc samplePredicate = genSamplePredicate(ts, tabBucketCols,
             colsEqual, alias, rwsch, qb.getMetaData(), null);
-        op = OperatorFactory.getAndMakeChild(new FilterDesc(
-            samplePredicate, true, new SampleDesc(ts.getNumerator(), ts
-                .getDenominator(), tabBucketCols, true)),
+        FilterDesc filterDesc = new FilterDesc(
+          samplePredicate, true, new SampleDesc(ts.getNumerator(),
+            ts.getDenominator(), tabBucketCols, true));
+        filterDesc.setGenerated(true);
+        op = OperatorFactory.getAndMakeChild(filterDesc,
             new RowSchema(rwsch.getColumnInfos()), top);
       } else {
         // need to add filter
@@ -9469,8 +9475,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         LOG.info("Need sample filter");
         ExprNodeDesc samplePredicate = genSamplePredicate(ts, tabBucketCols,
             colsEqual, alias, rwsch, qb.getMetaData(), null);
-        op = OperatorFactory.getAndMakeChild(new FilterDesc(
-            samplePredicate, true),
+        FilterDesc filterDesc = new FilterDesc(samplePredicate, true);
+        filterDesc.setGenerated(true);
+        op = OperatorFactory.getAndMakeChild(filterDesc,
             new RowSchema(rwsch.getColumnInfos()), top);
       }
     } else {
@@ -9499,11 +9506,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
             qb.getParseInfo().setTabSample(alias, tsSample);
             ExprNodeDesc samplePred = genSamplePredicate(tsSample, tab
                 .getBucketCols(), true, alias, rwsch, qb.getMetaData(), null);
-            op = OperatorFactory
-                .getAndMakeChild(new FilterDesc(samplePred, true,
-                    new SampleDesc(tsSample.getNumerator(), tsSample
-                        .getDenominator(), tab.getBucketCols(), true)),
-                    new RowSchema(rwsch.getColumnInfos()), top);
+            FilterDesc filterDesc = new FilterDesc(samplePred, true,
+              new SampleDesc(tsSample.getNumerator(), tsSample
+                .getDenominator(), tab.getBucketCols(), true));
+            filterDesc.setGenerated(true);
+            op = OperatorFactory.getAndMakeChild(filterDesc,
+              new RowSchema(rwsch.getColumnInfos()), top);
             LOG.info("No need for sample filter");
           } else {
             // The table is not bucketed, add a dummy filter :: rand()
@@ -9517,8 +9525,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
                     .valueOf(460476415)));
             ExprNodeDesc samplePred = genSamplePredicate(tsSample, null, false,
                 alias, rwsch, qb.getMetaData(), randFunc);
-            op = OperatorFactory.getAndMakeChild(new FilterDesc(
-                samplePred, true),
+            FilterDesc filterDesc = new FilterDesc(samplePred, true);
+            filterDesc.setGenerated(true);
+            op = OperatorFactory.getAndMakeChild(filterDesc,
                 new RowSchema(rwsch.getColumnInfos()), top);
           }
         }
@@ -10150,6 +10159,18 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       } catch (HiveException e) {
         throw new SemanticException(e);
       }
+
+      // Generate lineage info for create view statements
+      // if LineageLogger hook is configured.
+      if (HiveConf.getVar(conf, HiveConf.ConfVars.POSTEXECHOOKS).contains(
+          "org.apache.hadoop.hive.ql.hooks.LineageLogger")) {
+        ArrayList<Transform> transformations = new ArrayList<Transform>();
+        transformations.add(new HiveOpConverterPostProc());
+        transformations.add(new Generator());
+        for (Transform t : transformations) {
+          pCtx = t.transform(pCtx);
+        }
+      }
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
index 3a1a4af..5408dc8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
@@ -82,6 +82,7 @@ public class FilterDesc extends AbstractOperatorDesc {
   private transient SampleDesc sampleDescr;
   //Is this a filter that should perform a comparison for sorted searches
   private boolean isSortedFilter;
+  private transient boolean isGenerated;
 
   public FilterDesc() {
   }
@@ -148,6 +149,19 @@ public class FilterDesc extends AbstractOperatorDesc {
     this.isSortedFilter = isSortedFilter;
   }
 
+  /**
+   * Some filters are generated or implied, which means it is not in the query.
+   * It is added by the analyzer. For example, when we do an inner join, we add
+   * filters to exclude those rows with null join key values.
+   */
+  public boolean isGenerated() {
+    return isGenerated;
+  }
+
+  public void setGenerated(boolean isGenerated) {
+    this.isGenerated = isGenerated;
+  }
+
   @Override
   public Object clone() {
     FilterDesc filterDesc = new FilterDesc(getPredicate().clone(), getIsSamplingPred());

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
index 3a4ea2f..64eed68 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
@@ -309,6 +309,10 @@ public class JoinDesc extends AbstractOperatorDesc {
     return l;
   }
 
+  public ExprNodeDesc [][] getJoinKeys() {
+    return joinKeys;
+  }
+
   public JoinCondDesc[] getConds() {
     return conds;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
index e716ed2..223f0ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
@@ -62,6 +62,7 @@ public class LineageState {
   public LineageState() {
     dirToFop = new HashMap<Path, FileSinkOperator>();
     linfo = new LineageInfo();
+    index = new Index();
   }
 
   /**
@@ -109,12 +110,12 @@ public class LineageState {
   }
 
   /**
-   * Sets the index for the lineage state.
+   * Gets the index for the lineage state.
    *
-   * @param index The index derived from lineage context.
+   * @return Index.
    */
-  public void setIndex(Index index) {
-    this.index = index;
+  public Index getIndex() {
+    return index;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
index e1cab79..f0435cb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
@@ -280,7 +280,7 @@ public class TestUpdateDeleteSemanticAnalyzer {
     // validate the plan
     sem.validate();
 
-    QueryPlan plan = new QueryPlan(query, sem, 0L, testName, null);
+    QueryPlan plan = new QueryPlan(query, sem, 0L, testName, null, null);
 
     return new ReturnInfo(tree, sem, plan);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/test/queries/clientpositive/lineage2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/lineage2.q b/ql/src/test/queries/clientpositive/lineage2.q
new file mode 100644
index 0000000..6bcd1d7
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/lineage2.q
@@ -0,0 +1,116 @@
+set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.LineageLogger;
+
+drop table if exists src2;
+create table src2 as select key key2, value value2 from src1;
+
+select * from src1 where key is not null and value is not null limit 3;
+select * from src1 where key > 10 and value > 'val' order by key limit 5;
+
+drop table if exists dest1;
+create table dest1 as select * from src1;
+insert into table dest1 select * from src2;
+
+select key k, dest1.value from dest1;
+select key from src1 union select key2 from src2 order by key;
+select key k from src1 union select key2 from src2 order by k;
+
+select key, count(1) a from dest1 group by key;
+select key k, count(*) from dest1 group by key;
+select key k, count(value) from dest1 group by key;
+select value, max(length(key)) from dest1 group by value;
+select value, max(length(key)) from dest1 group by value order by value limit 5;
+
+select key, length(value) from dest1;
+select length(value) + 3 from dest1;
+select 5 from dest1;
+select 3 * 5 from dest1;
+
+drop table if exists dest2;
+create table dest2 as select * from src1 JOIN src2 ON src1.key = src2.key2;
+insert overwrite table dest2 select * from src1 JOIN src2 ON src1.key = src2.key2;
+insert into table dest2 select * from src1 JOIN src2 ON src1.key = src2.key2;
+insert into table dest2
+  select * from src1 JOIN src2 ON length(src1.value) = length(src2.value2) + 1;
+
+select * from src1 where length(key) > 2;
+select * from src1 where length(key) > 2 and value > 'a';
+
+drop table if exists dest3;
+create table dest3 as
+  select * from src1 JOIN src2 ON src1.key = src2.key2 WHERE length(key) > 1;
+insert overwrite table dest2
+  select * from src1 JOIN src2 ON src1.key = src2.key2 WHERE length(key) > 3;
+
+drop table if exists dest_l1;
+CREATE TABLE dest_l1(key INT, value STRING) STORED AS TEXTFILE;
+
+INSERT OVERWRITE TABLE dest_l1
+SELECT j.*
+FROM (SELECT t1.key, p1.value
+      FROM src1 t1
+      LEFT OUTER JOIN src p1
+      ON (t1.key = p1.key)
+      UNION ALL
+      SELECT t2.key, p2.value
+      FROM src1 t2
+      LEFT OUTER JOIN src p2
+      ON (t2.key = p2.key)) j;
+
+drop table if exists emp;
+drop table if exists dept;
+drop table if exists project;
+drop table if exists tgt;
+create table emp(emp_id int, name string, mgr_id int, dept_id int);
+create table dept(dept_id int, dept_name string);
+create table project(project_id int, project_name string);
+create table tgt(dept_name string, name string,
+  emp_id int, mgr_id int, proj_id int, proj_name string);
+
+INSERT INTO TABLE tgt
+SELECT emd.dept_name, emd.name, emd.emp_id, emd.mgr_id, p.project_id, p.project_name
+FROM (
+  SELECT d.dept_name, em.name, em.emp_id, em.mgr_id, em.dept_id
+  FROM (
+    SELECT e.name, e.dept_id, e.emp_id emp_id, m.emp_id mgr_id
+    FROM emp e JOIN emp m ON e.emp_id = m.emp_id
+    ) em
+  JOIN dept d ON d.dept_id = em.dept_id
+  ) emd JOIN project p ON emd.dept_id = p.project_id;
+
+drop table if exists dest_l2;
+create table dest_l2 (id int, c1 tinyint, c2 int, c3 bigint) stored as textfile;
+insert into dest_l2 values(0, 1, 100, 10000);
+
+select * from (
+  select c1 + c2 x from dest_l2
+  union all
+  select sum(c3) y from (select c3 from dest_l2) v1) v2 order by x;
+
+drop table if exists dest_l3;
+create table dest_l3 (id int, c1 string, c2 string, c3 int) stored as textfile;
+insert into dest_l3 values(0, "s1", "s2", 15);
+
+select sum(a.c1) over (partition by a.c1 order by a.id)
+from dest_l2 a
+where a.c2 != 10
+group by a.c1, a.c2, a.id
+having count(a.c2) > 0;
+
+select sum(a.c1), count(b.c1), b.c2, b.c3
+from dest_l2 a join dest_l3 b on (a.id = b.id)
+where a.c2 != 10 and b.c3 > 0
+group by a.c1, a.c2, a.id, b.c1, b.c2, b.c3
+having count(a.c2) > 0
+order by b.c3 limit 5;
+
+drop table if exists t;
+create table t as
+select distinct a.c2, a.c3 from dest_l2 a
+inner join dest_l3 b on (a.id = b.id)
+where a.id > 0 and b.c3 = 15;
+
+SELECT substr(src1.key,1,1), count(DISTINCT substr(src1.value,5)),
+concat(substr(src1.key,1,1),sum(substr(src1.value,5)))
+from src1
+GROUP BY substr(src1.key,1,1);
+