You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jx...@apache.org on 2015/07/02 20:26:21 UTC
[8/8] hive git commit: HIVE-11139: Emit more lineage information
(Jimmy, reviewed by Szehon)
HIVE-11139: Emit more lineage information (Jimmy, reviewed by Szehon)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cdd1c7bf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cdd1c7bf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cdd1c7bf
Branch: refs/heads/master
Commit: cdd1c7bf775d788fc94eee6d33e8d630158195f1
Parents: 470d9c8
Author: Jimmy Xiang <jx...@cloudera.com>
Authored: Tue May 19 10:43:39 2015 -0700
Committer: Jimmy Xiang <jx...@cloudera.com>
Committed: Thu Jul 2 08:28:38 2015 -0700
----------------------------------------------------------------------
pom.xml | 1 +
ql/pom.xml | 5 +
.../java/org/apache/hadoop/hive/ql/Driver.java | 2 +-
.../org/apache/hadoop/hive/ql/QueryPlan.java | 9 +-
.../hadoop/hive/ql/hooks/HookContext.java | 12 +
.../hadoop/hive/ql/hooks/LineageInfo.java | 96 +
.../hadoop/hive/ql/hooks/LineageLogger.java | 441 +++
.../ql/log/NoDeleteRollingFileAppender.java | 176 ++
.../ql/optimizer/lineage/ExprProcFactory.java | 91 +
.../hive/ql/optimizer/lineage/Generator.java | 16 +-
.../hive/ql/optimizer/lineage/LineageCtx.java | 79 +-
.../ql/optimizer/lineage/OpProcFactory.java | 228 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 51 +-
.../apache/hadoop/hive/ql/plan/FilterDesc.java | 14 +
.../apache/hadoop/hive/ql/plan/JoinDesc.java | 4 +
.../hadoop/hive/ql/session/LineageState.java | 9 +-
.../parse/TestUpdateDeleteSemanticAnalyzer.java | 2 +-
ql/src/test/queries/clientpositive/lineage2.q | 116 +
ql/src/test/queries/clientpositive/lineage3.q | 162 +
.../alter_partition_change_col.q.out | 8 +-
.../clientpositive/alter_table_cascade.q.out | 8 +-
.../test/results/clientpositive/combine2.q.out | 16 +-
.../clientpositive/groupby_sort_1_23.q.out | 10 +-
.../clientpositive/groupby_sort_skew_1_23.q.out | 10 +-
.../clientpositive/index_auto_mult_tables.q.out | 12 +
.../index_auto_mult_tables_compact.q.out | 9 +
.../clientpositive/index_auto_partitioned.q.out | 9 +
.../clientpositive/index_auto_update.q.out | 2 +
.../results/clientpositive/index_bitmap.q.out | 24 +
.../index_bitmap_auto_partitioned.q.out | 12 +
.../clientpositive/index_bitmap_rc.q.out | 24 +
.../results/clientpositive/index_compact.q.out | 18 +
.../clientpositive/index_compact_2.q.out | 18 +
ql/src/test/results/clientpositive/join34.q.out | 2 +-
ql/src/test/results/clientpositive/join35.q.out | 2 +-
.../test/results/clientpositive/lineage1.q.out | 4 +-
.../test/results/clientpositive/lineage2.q.out | 2905 ++++++++++++++++++
.../test/results/clientpositive/lineage3.q.out | 2473 +++++++++++++++
.../clientpositive/load_dyn_part13.q.out | 8 +-
.../results/clientpositive/multiMapJoin1.q.out | 10 +-
.../results/clientpositive/multi_insert.q.out | 32 +-
...i_insert_move_tasks_share_dependencies.q.out | 32 +-
.../orc_dictionary_threshold.q.out | 2 +-
ql/src/test/results/clientpositive/ptf.q.out | 28 +-
.../spark/groupby_sort_1_23.q.out | 10 +-
.../spark/groupby_sort_skew_1_23.q.out | 10 +-
.../results/clientpositive/spark/join34.q.out | 2 +-
.../results/clientpositive/spark/join35.q.out | 2 +-
.../clientpositive/spark/load_dyn_part13.q.out | 8 +-
.../clientpositive/spark/multi_insert.q.out | 32 +-
...i_insert_move_tasks_share_dependencies.q.out | 32 +-
.../test/results/clientpositive/spark/ptf.q.out | 28 +-
.../results/clientpositive/spark/union22.q.out | 4 +-
.../results/clientpositive/spark/union28.q.out | 4 +-
.../results/clientpositive/spark/union29.q.out | 4 +-
.../results/clientpositive/spark/union30.q.out | 4 +-
.../results/clientpositive/spark/union33.q.out | 4 +-
.../clientpositive/spark/union_date_trim.q.out | 4 +-
.../clientpositive/spark/union_remove_1.q.out | 4 +-
.../clientpositive/spark/union_remove_10.q.out | 2 +-
.../clientpositive/spark/union_remove_11.q.out | 2 +-
.../clientpositive/spark/union_remove_15.q.out | 8 +-
.../clientpositive/spark/union_remove_16.q.out | 8 +-
.../clientpositive/spark/union_remove_17.q.out | 4 +-
.../clientpositive/spark/union_remove_18.q.out | 24 +-
.../clientpositive/spark/union_remove_19.q.out | 12 +-
.../clientpositive/spark/union_remove_2.q.out | 2 +-
.../clientpositive/spark/union_remove_20.q.out | 4 +-
.../clientpositive/spark/union_remove_21.q.out | 2 +-
.../clientpositive/spark/union_remove_22.q.out | 12 +-
.../clientpositive/spark/union_remove_23.q.out | 2 +-
.../clientpositive/spark/union_remove_24.q.out | 4 +-
.../clientpositive/spark/union_remove_25.q.out | 16 +-
.../clientpositive/spark/union_remove_3.q.out | 2 +-
.../clientpositive/spark/union_remove_4.q.out | 4 +-
.../clientpositive/spark/union_remove_5.q.out | 2 +-
.../clientpositive/spark/union_remove_6.q.out | 8 +-
.../spark/union_remove_6_subq.q.out | 8 +-
.../clientpositive/spark/union_remove_7.q.out | 4 +-
.../clientpositive/spark/union_remove_8.q.out | 2 +-
.../clientpositive/spark/union_remove_9.q.out | 2 +-
.../clientpositive/spark/union_top_level.q.out | 4 +-
.../clientpositive/spark/vectorized_ptf.q.out | 28 +-
.../clientpositive/spark/windowing.q.out | 40 +-
.../temp_table_windowing_expressions.q.out | 8 +-
.../test/results/clientpositive/tez/ptf.q.out | 28 +-
.../clientpositive/tez/unionDistinct_1.q.out | 20 +-
.../clientpositive/tez/vectorized_ptf.q.out | 28 +-
.../test/results/clientpositive/union22.q.out | 4 +-
.../test/results/clientpositive/union28.q.out | 4 +-
.../test/results/clientpositive/union29.q.out | 4 +-
.../test/results/clientpositive/union30.q.out | 4 +-
.../test/results/clientpositive/union33.q.out | 4 +-
.../clientpositive/unionDistinct_1.q.out | 20 +-
.../clientpositive/union_date_trim.q.out | 4 +-
.../results/clientpositive/union_remove_1.q.out | 4 +-
.../clientpositive/union_remove_10.q.out | 2 +-
.../clientpositive/union_remove_11.q.out | 2 +-
.../clientpositive/union_remove_15.q.out | 8 +-
.../clientpositive/union_remove_16.q.out | 8 +-
.../clientpositive/union_remove_17.q.out | 4 +-
.../clientpositive/union_remove_18.q.out | 24 +-
.../clientpositive/union_remove_19.q.out | 12 +-
.../results/clientpositive/union_remove_2.q.out | 2 +-
.../clientpositive/union_remove_20.q.out | 4 +-
.../clientpositive/union_remove_21.q.out | 2 +-
.../clientpositive/union_remove_22.q.out | 12 +-
.../clientpositive/union_remove_23.q.out | 2 +-
.../clientpositive/union_remove_24.q.out | 4 +-
.../clientpositive/union_remove_25.q.out | 16 +-
.../results/clientpositive/union_remove_3.q.out | 2 +-
.../results/clientpositive/union_remove_4.q.out | 4 +-
.../results/clientpositive/union_remove_5.q.out | 2 +-
.../results/clientpositive/union_remove_6.q.out | 8 +-
.../clientpositive/union_remove_6_subq.q.out | 8 +-
.../results/clientpositive/union_remove_7.q.out | 4 +-
.../results/clientpositive/union_remove_8.q.out | 2 +-
.../results/clientpositive/union_remove_9.q.out | 2 +-
.../clientpositive/union_top_level.q.out | 4 +-
.../results/clientpositive/vectorized_ptf.q.out | 28 +-
.../test/results/clientpositive/windowing.q.out | 40 +-
.../clientpositive/windowing_expressions.q.out | 8 +-
122 files changed, 7398 insertions(+), 498 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f84f3e9..f2cb761 100644
--- a/pom.xml
+++ b/pom.xml
@@ -174,6 +174,7 @@
<felix.version>2.4.0</felix.version>
<curator.version>2.6.0</curator.version>
<jsr305.version>3.0.0</jsr305.version>
+ <gson.version>2.2.4</gson.version>
</properties>
<repositories>
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index b86b85d..6026c49 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -288,6 +288,11 @@
<version>${javaewah.version}</version>
</dependency>
<dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${gson.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.iq80.snappy</groupId>
<artifactId>snappy</artifactId>
<version>${snappy.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 669e6be..e04165b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -446,7 +446,7 @@ public class Driver implements CommandProcessor {
String operationName = ctx.getExplain() ?
HiveOperation.EXPLAIN.getOperationName() : SessionState.get().getCommandType();
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
- operationName);
+ operationName, getSchema(sem, conf));
conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr);
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index a0d61f5..29a3939 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.FetchTask;
@@ -89,6 +90,7 @@ public class QueryPlan implements Serializable {
protected LineageInfo linfo;
private TableAccessInfo tableAccessInfo;
private ColumnAccessInfo columnAccessInfo;
+ private Schema resultSchema;
private HashMap<String, String> idToTableNameMap;
@@ -111,7 +113,7 @@ public class QueryPlan implements Serializable {
}
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
- String operationName) {
+ String operationName, Schema resultSchema) {
this.queryString = queryString;
rootTasks = new ArrayList<Task<? extends Serializable>>();
@@ -133,6 +135,7 @@ public class QueryPlan implements Serializable {
queryProperties = sem.getQueryProperties();
queryStartTime = startTime;
this.operationName = operationName;
+ this.resultSchema = resultSchema;
}
public String getQueryStr() {
@@ -683,6 +686,10 @@ public class QueryPlan implements Serializable {
this.outputs = outputs;
}
+ public Schema getResultSchema() {
+ return resultSchema;
+ }
+
public HashMap<String, String> getIdToTableNameMap() {
return idToTableNameMap;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
index 0c6a938..bed17e9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -47,6 +48,7 @@ public class HookContext {
private Set<ReadEntity> inputs;
private Set<WriteEntity> outputs;
private LineageInfo linfo;
+ private Index depMap;
private UserGroupInformation ugi;
private HookType hookType;
final private Map<String, ContentSummary> inputPathToContentSummary;
@@ -67,8 +69,10 @@ public class HookContext {
outputs = queryPlan.getOutputs();
ugi = Utils.getUGI();
linfo= null;
+ depMap = null;
if(SessionState.get() != null){
linfo = SessionState.get().getLineageState().getLineageInfo();
+ depMap = SessionState.get().getLineageState().getIndex();
}
this.userName = userName;
this.ipAddress = ipAddress;
@@ -127,6 +131,14 @@ public class HookContext {
this.linfo = linfo;
}
+ public Index getIndex() {
+ return depMap;
+ }
+
+ public void setIndex(Index depMap) {
+ this.depMap = depMap;
+ }
+
public UserGroupInformation getUgi() {
return ugi;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
index f98b38b..fe0841e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
@@ -21,10 +21,13 @@ package org.apache.hadoop.hive.ql.hooks;
import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.collections.SetUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -260,6 +263,25 @@ public class LineageInfo implements Serializable {
public String toString() {
return tabAlias + ":" + column;
}
+
+ @Override
+ public int hashCode() {
+ return (column != null ? column.hashCode() : 7)
+ + (tabAlias != null ? tabAlias.hashCode() : 11);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof BaseColumnInfo)) {
+ return false;
+ }
+ BaseColumnInfo ci = (BaseColumnInfo) obj;
+ return (column == null ? ci.column == null : column.equals(ci.column))
+ && (tabAlias == null ? ci.tabAlias == null : tabAlias.equals(ci.tabAlias));
+ }
}
public static class TableAliasInfo implements Serializable {
@@ -311,6 +333,25 @@ public class LineageInfo implements Serializable {
public String toString() {
return table.getDbName() + "." + table.getTableName() + "(" + alias + ")";
}
+
+ @Override
+ public int hashCode() {
+ return (alias != null ? alias.hashCode() : 7)
+ + (table != null ? table.hashCode() : 11);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof TableAliasInfo)) {
+ return false;
+ }
+ TableAliasInfo tabAlias = (TableAliasInfo) obj;
+ return StringUtils.equals(alias, tabAlias.alias)
+ && (table == null ? tabAlias.table == null : table.equals(tabAlias.table));
+ }
}
/**
@@ -387,6 +428,61 @@ public class LineageInfo implements Serializable {
}
/**
+ * This class tracks the predicate information for an operator.
+ */
+ public static class Predicate {
+
+ /**
+ * Expression string for the predicate.
+ */
+ private String expr;
+
+ /**
+ * The set of base columns that the predicate depends on.
+ */
+ private Set<BaseColumnInfo> baseCols = new LinkedHashSet<BaseColumnInfo>();
+
+ /**
+ * @return the expr
+ */
+ public String getExpr() {
+ return expr;
+ }
+
+ /**
+ * @param expr the expr to set
+ */
+ public void setExpr(String expr) {
+ this.expr = expr;
+ }
+
+ /**
+ * @return the baseCols
+ */
+ public Set<BaseColumnInfo> getBaseCols() {
+ return baseCols;
+ }
+
+ @Override
+ public int hashCode() {
+ return baseCols.hashCode() + (expr != null ? expr.hashCode() : 11);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof Predicate)) {
+ return false;
+ }
+ Predicate cond = (Predicate) obj;
+ return StringUtils.equals(cond.expr, expr)
+ && SetUtils.isEqualSet(cond.baseCols, baseCols);
+ }
+ }
+
+ /**
* The map contains an index from the (datacontainer, columnname) to the
* dependency vector for that tuple. This is used to generate the
* dependency vectors during the walk of the operator tree.
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
new file mode 100644
index 0000000..fc32af7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
@@ -0,0 +1,441 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.collections.SetUtils;
+import org.apache.commons.io.output.StringBuilderWriter;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.HookContext.HookType;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Predicate;
+import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.gson.stream.JsonWriter;
+
+/**
+ * Implementation of a post execute hook that logs lineage info to a log file.
+ */
+public class LineageLogger implements ExecuteWithHookContext {
+
+ private static final Log LOG = LogFactory.getLog(LineageLogger.class);
+
+ private static final HashSet<String> OPERATION_NAMES = new HashSet<String>();
+
+ static {
+ OPERATION_NAMES.add(HiveOperation.QUERY.getOperationName());
+ OPERATION_NAMES.add(HiveOperation.CREATETABLE_AS_SELECT.getOperationName());
+ OPERATION_NAMES.add(HiveOperation.ALTERVIEW_AS.getOperationName());
+ OPERATION_NAMES.add(HiveOperation.CREATEVIEW.getOperationName());
+ }
+
+ private static final String FORMAT_VERSION = "1.0";
+
+ final static class Edge {
+ public static enum Type {
+ PROJECTION, PREDICATE
+ }
+
+ private Set<Vertex> sources;
+ private Set<Vertex> targets;
+ private String expr;
+ private Type type;
+
+ Edge(Set<Vertex> sources, Set<Vertex> targets, String expr, Type type) {
+ this.sources = sources;
+ this.targets = targets;
+ this.expr = expr;
+ this.type = type;
+ }
+ }
+
+ final static class Vertex {
+ public static enum Type {
+ COLUMN, TABLE
+ }
+ private Type type;
+ private String label;
+ private int id;
+
+ Vertex(String label) {
+ this(label, Type.COLUMN);
+ }
+
+ Vertex(String label, Type type) {
+ this.label = label;
+ this.type = type;
+ }
+
+ @Override
+ public int hashCode() {
+ return label.hashCode() + type.hashCode() * 3;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof Vertex)) {
+ return false;
+ }
+ Vertex vertex = (Vertex) obj;
+ return label.equals(vertex.label) && type == vertex.type;
+ }
+ }
+
+ @Override
+ public void run(HookContext hookContext) {
+ assert(hookContext.getHookType() == HookType.POST_EXEC_HOOK);
+ QueryPlan plan = hookContext.getQueryPlan();
+ Index index = hookContext.getIndex();
+ SessionState ss = SessionState.get();
+ if (ss != null && index != null
+ && OPERATION_NAMES.contains(plan.getOperationName())) {
+ try {
+ StringBuilderWriter out = new StringBuilderWriter(1024);
+ JsonWriter writer = new JsonWriter(out);
+ writer.setIndent(" ");
+
+ out.append("POSTHOOK: LINEAGE: ");
+ String queryStr = plan.getQueryStr().trim();
+ writer.beginObject();
+ writer.name("version").value(FORMAT_VERSION);
+ HiveConf conf = ss.getConf();
+ boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST);
+ if (!testMode) {
+ // Don't emit user/timestamp info in test mode,
+ // so that the test golden output file is fixed.
+ long queryTime = plan.getQueryStartTime().longValue();
+ writer.name("user").value(hookContext.getUgi().getUserName());
+ writer.name("timestamp").value(queryTime/1000);
+ writer.name("jobIds");
+ writer.beginArray();
+ List<TaskRunner> tasks = hookContext.getCompleteTaskList();
+ if (tasks != null && !tasks.isEmpty()) {
+ for (TaskRunner task: tasks) {
+ String jobId = task.getTask().getJobID();
+ if (jobId != null) {
+ writer.value(jobId);
+ }
+ }
+ }
+ writer.endArray();
+ }
+ writer.name("engine").value(
+ HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE));
+ writer.name("hash").value(getQueryHash(queryStr));
+ writer.name("queryText").value(queryStr);
+
+ List<Edge> edges = getEdges(plan, index);
+ Set<Vertex> vertices = getVertices(edges);
+ writeEdges(writer, edges);
+ writeVertices(writer, vertices);
+ writer.endObject();
+ writer.close();
+
+ // Log the lineage info
+ String lineage = out.toString();
+ if (testMode) {
+ // Log to console
+ log(lineage);
+ } else {
+ // In none test mode, emit to a log file,
+ // which can be different from the normal hive.log.
+ // For example, using NoDeleteRollingFileAppender to
+ // log to some file with different rolling policy.
+ LOG.info(lineage);
+ }
+ } catch (Throwable t) {
+ // Don't fail the query just because of any lineage issue.
+ log("Failed to log lineage graph, query is not affected\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(t));
+ }
+ }
+ }
+
+ /**
+ * Log an error to console if available.
+ */
+ private void log(String error) {
+ LogHelper console = SessionState.getConsole();
+ if (console != null) {
+ console.printError(error);
+ }
+ }
+
+ /**
+ * Based on the final select operator, find out all the target columns.
+ * For each target column, find out its sources based on the dependency index.
+ */
+ private List<Edge> getEdges(QueryPlan plan, Index index) {
+ List<FieldSchema> fieldSchemas = plan.getResultSchema().getFieldSchemas();
+ int fields = fieldSchemas == null ? 0 : fieldSchemas.size();
+ SelectOperator finalSelOp = index.getFinalSelectOp();
+ List<Edge> edges = new ArrayList<Edge>();
+ if (finalSelOp != null && fields > 0) {
+ Map<ColumnInfo, Dependency> colMap = index.getDependencies(finalSelOp);
+ List<Dependency> dependencies = colMap != null ? Lists.newArrayList(colMap.values()) : null;
+ if (dependencies == null || dependencies.size() != fields) {
+ log("Result schema has " + fields
+ + " fields, but we don't get as many dependencies");
+ } else {
+ String destTableName = null;
+ List<String> colNames = null;
+ // Based on the plan outputs, find out the target table name and column names.
+ for (WriteEntity output : plan.getOutputs()) {
+ if (output.getType() == Entity.Type.TABLE) {
+ org.apache.hadoop.hive.ql.metadata.Table t = output.getTable();
+ destTableName = t.getDbName() + "." + t.getTableName();
+ List<FieldSchema> cols = t.getCols();
+ if (cols != null && !cols.isEmpty()) {
+ colNames = Utilities.getColumnNamesFromFieldSchema(cols);
+ }
+ break;
+ }
+ }
+
+ // Go through each target column, generate the lineage edges.
+ Set<Vertex> allTargets = new LinkedHashSet<Vertex>();
+ Map<String, Vertex> allSources = new LinkedHashMap<String, Vertex>();
+ for (int i = 0; i < fields; i++) {
+ Vertex target = new Vertex(
+ getTargetFieldName(i, destTableName, colNames, fieldSchemas));
+ allTargets.add(target);
+ Dependency dep = dependencies.get(i);
+ String expr = dep.getExpr();
+ Set<Vertex> sources = createSourceVertices(allSources, dep.getBaseCols());
+ Edge edge = findSimilarEdgeBySources(edges, sources, expr, Edge.Type.PROJECTION);
+ if (edge == null) {
+ Set<Vertex> targets = new LinkedHashSet<Vertex>();
+ targets.add(target);
+ edges.add(new Edge(sources, targets, expr, Edge.Type.PROJECTION));
+ } else {
+ edge.targets.add(target);
+ }
+ }
+ Set<Predicate> conds = index.getPredicates(finalSelOp);
+ if (conds != null && !conds.isEmpty()) {
+ for (Predicate cond: conds) {
+ String expr = cond.getExpr();
+ Set<Vertex> sources = createSourceVertices(allSources, cond.getBaseCols());
+ Edge edge = findSimilarEdgeByTargets(edges, allTargets, expr, Edge.Type.PREDICATE);
+ if (edge == null) {
+ edges.add(new Edge(sources, allTargets, expr, Edge.Type.PREDICATE));
+ } else {
+ edge.sources.addAll(sources);
+ }
+ }
+ }
+ }
+ }
+ return edges;
+ }
+
+ /**
+ * Convert a list of columns to a set of vertices.
+ * Use cached vertices if possible.
+ */
+ private Set<Vertex> createSourceVertices(
+ Map<String, Vertex> srcVertexCache, Collection<BaseColumnInfo> baseCols) {
+ Set<Vertex> sources = new LinkedHashSet<Vertex>();
+ if (baseCols != null && !baseCols.isEmpty()) {
+ for(BaseColumnInfo col: baseCols) {
+ Table table = col.getTabAlias().getTable();
+ if (table.isTemporary()) {
+ // Ignore temporary tables
+ continue;
+ }
+ Vertex.Type type = Vertex.Type.TABLE;
+ String tableName = table.getDbName() + "." + table.getTableName();
+ FieldSchema fieldSchema = col.getColumn();
+ String label = tableName;
+ if (fieldSchema != null) {
+ type = Vertex.Type.COLUMN;
+ label = tableName + "." + fieldSchema.getName();
+ }
+ sources.add(getOrCreateVertex(srcVertexCache, label, type));
+ }
+ }
+ return sources;
+ }
+
+ /**
+ * Find a vertex from a cache, or create one if not.
+ */
+ private Vertex getOrCreateVertex(
+ Map<String, Vertex> vertices, String label, Vertex.Type type) {
+ Vertex vertex = vertices.get(label);
+ if (vertex == null) {
+ vertex = new Vertex(label, type);
+ vertices.put(label, vertex);
+ }
+ return vertex;
+ }
+
+ /**
+ * Find an edge that has the same type, expression, and sources.
+ */
+ private Edge findSimilarEdgeBySources(
+ List<Edge> edges, Set<Vertex> sources, String expr, Edge.Type type) {
+ for (Edge edge: edges) {
+ if (edge.type == type && StringUtils.equals(edge.expr, expr)
+ && SetUtils.isEqualSet(edge.sources, sources)) {
+ return edge;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Find an edge that has the same type, expression, and targets.
+ */
+ private Edge findSimilarEdgeByTargets(
+ List<Edge> edges, Set<Vertex> targets, String expr, Edge.Type type) {
+ for (Edge edge: edges) {
+ if (edge.type == type && StringUtils.equals(edge.expr, expr)
+ && SetUtils.isEqualSet(edge.targets, targets)) {
+ return edge;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Generate normalized name for a given target column.
+ */
+ private String getTargetFieldName(int fieldIndex,
+ String destTableName, List<String> colNames, List<FieldSchema> fieldSchemas) {
+ String fieldName = fieldSchemas.get(fieldIndex).getName();
+ String[] parts = fieldName.split("\\.");
+ if (destTableName != null) {
+ String colName = parts[parts.length - 1];
+ if (colNames != null && !colNames.contains(colName)) {
+ colName = colNames.get(fieldIndex);
+ }
+ return destTableName + "." + colName;
+ }
+ if (parts.length == 2 && parts[0].startsWith("_u")) {
+ return parts[1];
+ }
+ return fieldName;
+ }
+
+ /**
+ * Get all the vertices of all edges. Targets at first,
+ * then sources. Assign id to each vertex.
+ */
+ private Set<Vertex> getVertices(List<Edge> edges) {
+ Set<Vertex> vertices = new LinkedHashSet<Vertex>();
+ for (Edge edge: edges) {
+ vertices.addAll(edge.targets);
+ }
+ for (Edge edge: edges) {
+ vertices.addAll(edge.sources);
+ }
+
+ // Assign ids to all vertices,
+ // targets at first, then sources.
+ int id = 0;
+ for (Vertex vertex: vertices) {
+ vertex.id = id++;
+ }
+ return vertices;
+ }
+
+ /**
+ * Write out an JSON array of edges.
+ */
+ private void writeEdges(JsonWriter writer, List<Edge> edges) throws IOException {
+ writer.name("edges");
+ writer.beginArray();
+ for (Edge edge: edges) {
+ writer.beginObject();
+ writer.name("sources");
+ writer.beginArray();
+ for (Vertex vertex: edge.sources) {
+ writer.value(vertex.id);
+ }
+ writer.endArray();
+ writer.name("targets");
+ writer.beginArray();
+ for (Vertex vertex: edge.targets) {
+ writer.value(vertex.id);
+ }
+ writer.endArray();
+ if (edge.expr != null) {
+ writer.name("expression").value(edge.expr);
+ }
+ writer.name("edgeType").value(edge.type.name());
+ writer.endObject();
+ }
+ writer.endArray();
+ }
+
+ /**
+ * Write out an JSON array of vertices.
+ */
+ private void writeVertices(JsonWriter writer, Set<Vertex> vertices) throws IOException {
+ writer.name("vertices");
+ writer.beginArray();
+ for (Vertex vertex: vertices) {
+ writer.beginObject();
+ writer.name("id").value(vertex.id);
+ writer.name("vertexType").value(vertex.type.name());
+ writer.name("vertexId").value(vertex.label);
+ writer.endObject();
+ }
+ writer.endArray();
+ }
+
+ /**
+ * Generate query string md5 hash.
+ */
+ private String getQueryHash(String queryStr) {
+ Hasher hasher = Hashing.md5().newHasher();
+ hasher.putString(queryStr);
+ return hasher.hash().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/log/NoDeleteRollingFileAppender.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/NoDeleteRollingFileAppender.java b/ql/src/java/org/apache/hadoop/hive/ql/log/NoDeleteRollingFileAppender.java
new file mode 100644
index 0000000..be32f06
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/log/NoDeleteRollingFileAppender.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.log;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.Writer;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Layout;
+import org.apache.log4j.helpers.CountingQuietWriter;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.helpers.OptionConverter;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class NoDeleteRollingFileAppender extends FileAppender {
+ /**
+ * The default maximum file size is 10MB.
+ */
+ protected long maxFileSize = 10 * 1024 * 1024;
+
+ private long nextRollover = 0;
+
+ /**
+ * The default constructor simply calls its {@link FileAppender#FileAppender
+ * parents constructor}.
+ */
+ public NoDeleteRollingFileAppender() {
+ }
+
+ /**
+ * Instantiate a RollingFileAppender and open the file designated by
+ * <code>filename</code>. The opened filename will become the output
+ * destination for this appender.
+ * <p>
+ * If the <code>append</code> parameter is true, the file will be appended to.
+ * Otherwise, the file designated by <code>filename</code> will be truncated
+ * before being opened.
+ */
+ public NoDeleteRollingFileAppender(Layout layout, String filename,
+ boolean append) throws IOException {
+ super(layout, filename, append);
+ }
+
+ /**
+ * Instantiate a FileAppender and open the file designated by
+ * <code>filename</code>. The opened filename will become the output
+ * destination for this appender.
+ * <p>
+ * The file will be appended to.
+ */
+ public NoDeleteRollingFileAppender(Layout layout, String filename)
+ throws IOException {
+ super(layout, filename);
+ }
+
+ /**
+ * Get the maximum size that the output file is allowed to reach before being
+ * rolled over to backup files.
+ */
+ public long getMaximumFileSize() {
+ return maxFileSize;
+ }
+
+ /**
+ * Implements the usual roll over behavior.
+ * <p>
+ * <code>File</code> is renamed <code>File.yyyyMMddHHmmss</code> and closed. A
+ * new <code>File</code> is created to receive further log output.
+ */
+ // synchronization not necessary since doAppend is already synced
+ public void rollOver() {
+ if (qw != null) {
+ long size = ((CountingQuietWriter) qw).getCount();
+ LogLog.debug("rolling over count=" + size);
+ // if operation fails, do not roll again until
+ // maxFileSize more bytes are written
+ nextRollover = size + maxFileSize;
+ }
+
+ this.closeFile(); // keep windows happy.
+
+ int p = fileName.lastIndexOf(".");
+ String file = p > 0 ? fileName.substring(0, p) : fileName;
+ try {
+ // This will also close the file. This is OK since multiple
+ // close operations are safe.
+ this.setFile(file, false, bufferedIO, bufferSize);
+ nextRollover = 0;
+ } catch (IOException e) {
+ if (e instanceof InterruptedIOException) {
+ Thread.currentThread().interrupt();
+ }
+ LogLog.error("setFile(" + file + ", false) call failed.", e);
+ }
+ }
+
+ public synchronized void setFile(String fileName, boolean append,
+ boolean bufferedIO, int bufferSize) throws IOException {
+ String newFileName = getLogFileName(fileName);
+ super.setFile(newFileName, append, this.bufferedIO, this.bufferSize);
+ if (append) {
+ File f = new File(newFileName);
+ ((CountingQuietWriter) qw).setCount(f.length());
+ }
+ }
+
+ /**
+ * Set the maximum size that the output file is allowed to reach before being
+ * rolled over to backup files.
+ * <p>
+ * This method is equivalent to {@link #setMaxFileSize} except that it is
+ * required for differentiating the setter taking a <code>long</code> argument
+ * from the setter taking a <code>String</code> argument by the JavaBeans
+ * {@link java.beans.Introspector Introspector}.
+ *
+ * @see #setMaxFileSize(String)
+ */
+ public void setMaximumFileSize(long maxFileSize) {
+ this.maxFileSize = maxFileSize;
+ }
+
+ /**
+ * Set the maximum size that the output file is allowed to reach before being
+ * rolled over to backup files.
+ * <p>
+ * In configuration files, the <b>MaxFileSize</b> option takes an long integer
+ * in the range 0 - 2^63. You can specify the value with the suffixes "KB",
+ * "MB" or "GB" so that the integer is interpreted being expressed
+ * respectively in kilobytes, megabytes or gigabytes. For example, the value
+ * "10KB" will be interpreted as 10240.
+ */
+ public void setMaxFileSize(String value) {
+ maxFileSize = OptionConverter.toFileSize(value, maxFileSize + 1);
+ }
+
+ protected void setQWForFiles(Writer writer) {
+ this.qw = new CountingQuietWriter(writer, errorHandler);
+ }
+
+ /**
+ * This method differentiates RollingFileAppender from its super class.
+ */
+ protected void subAppend(LoggingEvent event) {
+ super.subAppend(event);
+
+ if (fileName != null && qw != null) {
+ long size = ((CountingQuietWriter) qw).getCount();
+ if (size >= maxFileSize && size >= nextRollover) {
+ rollOver();
+ }
+ }
+ }
+
+ // Mangled file name. Append the current timestamp
+ private static String getLogFileName(String oldFileName) {
+ return oldFileName + "." + Long.toString(System.currentTimeMillis());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
index c930b80..46fe5db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
@@ -26,14 +26,19 @@ import java.util.List;
import java.util.Map;
import java.util.Stack;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.DependencyType;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Predicate;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.TableAliasInfo;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -43,6 +48,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -162,6 +168,91 @@ public class ExprProcFactory {
return new ColumnExprProcessor();
}
+ private static boolean findSourceColumn(
+ LineageCtx lctx, Predicate cond, String tabAlias, String alias) {
+ for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpMap: lctx
+ .getParseCtx().getTopOps().entrySet()) {
+ Operator<? extends OperatorDesc> topOp = topOpMap.getValue();
+ if (topOp instanceof TableScanOperator) {
+ TableScanOperator tableScanOp = (TableScanOperator) topOp;
+ Table tbl = tableScanOp.getConf().getTableMetadata();
+ if (tbl.getTableName().equals(tabAlias)
+ || tabAlias.equals(tableScanOp.getConf().getAlias())) {
+ for (FieldSchema column: tbl.getCols()) {
+ if (column.getName().equals(alias)) {
+ TableAliasInfo table = new TableAliasInfo();
+ table.setTable(tbl.getTTable());
+ table.setAlias(tabAlias);
+ BaseColumnInfo colInfo = new BaseColumnInfo();
+ colInfo.setColumn(column);
+ colInfo.setTabAlias(table);
+ cond.getBaseCols().add(colInfo);
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Get the expression string of an expression node.
+ */
+ public static String getExprString(RowSchema rs, ExprNodeDesc expr,
+ LineageCtx lctx, Operator<? extends OperatorDesc> inpOp, Predicate cond) {
+ if (expr instanceof ExprNodeColumnDesc) {
+ ExprNodeColumnDesc col = (ExprNodeColumnDesc) expr;
+ String columnName = col.getColumn();
+ ColumnInfo ci = rs.getColumnInfo(columnName);
+ String alias = ci != null ? ci.getAlias() : columnName;
+ String internalName = ci != null ? ci.getInternalName() : columnName;
+ Dependency dep = lctx.getIndex().getDependency(inpOp, internalName);
+ String tabAlias = ci != null ? ci.getTabAlias() : col.getTabAlias();
+ if ((tabAlias == null || tabAlias.startsWith("_") || tabAlias.startsWith("$"))
+ && (dep != null && dep.getType() == DependencyType.SIMPLE)) {
+ List<BaseColumnInfo> baseCols = dep.getBaseCols();
+ if (baseCols != null && !baseCols.isEmpty()) {
+ BaseColumnInfo baseCol = baseCols.get(0);
+ tabAlias = baseCol.getTabAlias().getAlias();
+ alias = baseCol.getColumn().getName();
+ }
+ }
+ if (tabAlias != null && tabAlias.length() > 0
+ && !tabAlias.startsWith("_") && !tabAlias.startsWith("$")) {
+ if (cond != null && !findSourceColumn(lctx, cond, tabAlias, alias) && dep != null) {
+ cond.getBaseCols().addAll(dep.getBaseCols());
+ }
+ return tabAlias + "." + alias;
+ }
+
+ if (dep != null) {
+ if (cond != null) {
+ cond.getBaseCols().addAll(dep.getBaseCols());
+ }
+ if (dep.getExpr() != null) {
+ return dep.getExpr();
+ }
+ }
+ if (alias.startsWith("_")) {
+ ci = inpOp.getSchema().getColumnInfo(columnName);
+ if (ci != null) {
+ alias = ci.getAlias();
+ }
+ }
+ return alias;
+ } else if (expr instanceof ExprNodeGenericFuncDesc) {
+ ExprNodeGenericFuncDesc func = (ExprNodeGenericFuncDesc) expr;
+ List<ExprNodeDesc> children = func.getChildren();
+ String[] childrenExprStrings = new String[children.size()];
+ for (int i = 0; i < childrenExprStrings.length; i++) {
+ childrenExprStrings[i] = getExprString(rs, children.get(i), lctx, inpOp, cond);
+ }
+ return func.getGenericUDF().getDisplayString(childrenExprStrings);
+ }
+ return expr.getExprString();
+ }
+
/**
* Gets the expression dependencies for the expression.
*
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
index 51bef04..9a5cf55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
@@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.optimizer.Transform;
+import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -59,8 +61,11 @@ public class Generator implements Transform {
@Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
+ Index index = SessionState.get() != null ?
+ SessionState.get().getLineageState().getIndex() : new Index();
+
// Create the lineage context
- LineageCtx lCtx = new LineageCtx(pctx);
+ LineageCtx lCtx = new LineageCtx(pctx, index);
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%"),
@@ -83,7 +88,9 @@ public class Generator implements Transform {
opRules.put(new RuleRegExp("R9", LateralViewJoinOperator.getOperatorName() + "%"),
OpProcFactory.getLateralViewJoinProc());
opRules.put(new RuleRegExp("R10", PTFOperator.getOperatorName() + "%"),
- OpProcFactory.getTransformProc());
+ OpProcFactory.getTransformProc());
+ opRules.put(new RuleRegExp("R11", FilterOperator.getOperatorName() + "%"),
+ OpProcFactory.getFilterProc());
// The dispatcher fires the processor corresponding to the closest matching rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(OpProcFactory.getDefaultProc(), opRules, lCtx);
@@ -94,11 +101,6 @@ public class Generator implements Transform {
topNodes.addAll(pctx.getTopOps().values());
ogw.startWalking(topNodes, null);
- // Transfer the index from the lineage context to the session state.
- if (SessionState.get() != null) {
- SessionState.get().getLineageState().setIndex(lCtx.getIndex());
- }
-
return pctx;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
index cef24e3..d26d8da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.optimizer.lineage;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
@@ -26,9 +27,11 @@ import java.util.Set;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Predicate;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -44,11 +47,6 @@ public class LineageCtx implements NodeProcessorCtx {
public static class Index {
/**
- * Serial Version UID.
- */
- private static final long serialVersionUID = 1L;
-
- /**
* The map contains an index from the (operator, columnInfo) to the
* dependency vector for that tuple. This is used to generate the
* dependency vectors during the walk of the operator tree.
@@ -57,12 +55,20 @@ public class LineageCtx implements NodeProcessorCtx {
LinkedHashMap<ColumnInfo, Dependency>> depMap;
/**
+ * A map from operator to the conditions strings.
+ */
+ private final Map<Operator<? extends OperatorDesc>, Set<Predicate>> condMap;
+
+ private SelectOperator finalSelectOp;
+
+ /**
* Constructor.
*/
public Index() {
depMap =
new LinkedHashMap<Operator<? extends OperatorDesc>,
LinkedHashMap<ColumnInfo, Dependency>>();
+ condMap = new HashMap<Operator<? extends OperatorDesc>, Set<Predicate>>();
}
/**
@@ -83,6 +89,28 @@ public class LineageCtx implements NodeProcessorCtx {
}
/**
+ * Gets the dependency for a tuple of an operator,
+ * and a ColumnInfo with specified internal name.
+ *
+ * @param op The operator whose dependency is being inspected.
+ * @param internalName The internal name of the column info
+ * @return Dependency for that particular operator, ColumnInfo tuple.
+ * null if no dependency is found.
+ */
+ public Dependency getDependency(
+ Operator<? extends OperatorDesc> op, String internalName) {
+ Map<ColumnInfo, Dependency> colMap = depMap.get(op);
+ if (colMap != null) {
+ for (Map.Entry<ColumnInfo, Dependency> e: colMap.entrySet()) {
+ if (e.getKey().getInternalName().equals(internalName)) {
+ return e.getValue();
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
* Puts the dependency for an operator, columninfo tuple.
* @param op The operator whose dependency is being inserted.
* @param col The column info whose dependency is being inserted.
@@ -124,7 +152,43 @@ public class LineageCtx implements NodeProcessorCtx {
}
}
+ public Map<ColumnInfo, Dependency> getDependencies(Operator<? extends OperatorDesc> op) {
+ return depMap.get(op);
+ }
+
+ public void addPredicate(Operator<? extends OperatorDesc> op, Predicate cond) {
+ Set<Predicate> conds = condMap.get(op);
+ if (conds == null) {
+ conds = new LinkedHashSet<Predicate>();
+ condMap.put(op, conds);
+ }
+ conds.add(cond);
+ }
+
+ public void copyPredicates(Operator<? extends OperatorDesc> srcOp,
+ Operator<? extends OperatorDesc> tgtOp) {
+ Set<Predicate> conds = getPredicates(srcOp);
+ if (conds != null) {
+ for (Predicate cond: conds) {
+ addPredicate(tgtOp, cond);
+ }
+ }
+ }
+
+ public Set<Predicate> getPredicates(Operator<? extends OperatorDesc> op) {
+ return condMap.get(op);
+ }
+
+ public void setFinalSelectOp(SelectOperator sop) {
+ finalSelectOp = sop;
+ }
+
+ public SelectOperator getFinalSelectOp() {
+ return finalSelectOp;
+ }
+
public void clear() {
+ finalSelectOp = null;
depMap.clear();
}
}
@@ -145,9 +209,10 @@ public class LineageCtx implements NodeProcessorCtx {
* Constructor.
*
* @param pctx The parse context that is used to get table metadata information.
+ * @param index The dependency map.
*/
- public LineageCtx(ParseContext pctx) {
- index = new Index();
+ public LineageCtx(ParseContext pctx, Index index) {
+ this.index = index;
this.pctx = pctx;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
index 5957ac0..f670db8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
@@ -27,18 +27,20 @@ import java.util.Map;
import java.util.Set;
import java.util.Stack;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.ForwardOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.DependencyType;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo.Predicate;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.TableAliasInfo;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -56,6 +59,8 @@ import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -72,6 +77,7 @@ public class OpProcFactory {
*
* @return Operator The parent operator in the current path.
*/
+ @SuppressWarnings("unchecked")
protected static Operator<? extends OperatorDesc> getParent(Stack<Node> stack) {
return (Operator<? extends OperatorDesc>)Utils.getNthAncestor(stack, 1);
}
@@ -89,8 +95,10 @@ public class OpProcFactory {
LineageCtx lCtx = (LineageCtx) procCtx;
// The operators
+ @SuppressWarnings("unchecked")
Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>)nd;
Operator<? extends OperatorDesc> inpOp = getParent(stack);
+ lCtx.getIndex().copyPredicates(inpOp, op);
// Create a single dependency list by concatenating the dependencies of all
// the cols
@@ -105,16 +113,27 @@ public class OpProcFactory {
Dependency d = lCtx.getIndex().getDependency(inpOp, ci);
if (d != null) {
new_type = LineageCtx.getNewDependencyType(d.getType(), new_type);
- col_set.addAll(d.getBaseCols());
+ if (!ci.isHiddenVirtualCol()) {
+ col_set.addAll(d.getBaseCols());
+ }
}
}
dep.setType(new_type);
dep.setBaseCols(new ArrayList<BaseColumnInfo>(col_set));
+ boolean isScript = op instanceof ScriptOperator;
+
// This dependency is then set for all the colinfos of the script operator
for(ColumnInfo ci : op.getSchema().getSignature()) {
- lCtx.getIndex().putDependency(op, ci, dep);
+ Dependency d = dep;
+ if (!isScript) {
+ Dependency dep_ci = lCtx.getIndex().getDependency(inpOp, ci);
+ if (dep_ci != null) {
+ d = dep_ci;
+ }
+ }
+ lCtx.getIndex().putDependency(op, ci, d);
}
return null;
@@ -167,8 +186,6 @@ public class OpProcFactory {
// Populate the dependency
dep.setType(LineageInfo.DependencyType.SIMPLE);
- // TODO: Find out how to get the expression here.
- dep.setExpr(null);
dep.setBaseCols(new ArrayList<BaseColumnInfo>());
dep.getBaseCols().add(bci);
@@ -189,7 +206,7 @@ public class OpProcFactory {
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
- // Assert that there is atleast one item in the stack. This should never
+ // Assert that there is at least one item in the stack. This should never
// be called for leafs.
assert(!stack.isEmpty());
@@ -200,6 +217,12 @@ public class OpProcFactory {
// The input operator to the join is always a reduce sink operator
ReduceSinkOperator inpOp = (ReduceSinkOperator)getParent(stack);
+ lCtx.getIndex().copyPredicates(inpOp, op);
+ Predicate cond = getPredicate(op, lCtx);
+ if (cond != null) {
+ lCtx.getIndex().addPredicate(op, cond);
+ }
+
ReduceSinkDesc rd = inpOp.getConf();
int tag = rd.getTag();
@@ -221,6 +244,59 @@ public class OpProcFactory {
return null;
}
+ private Predicate getPredicate(JoinOperator jop, LineageCtx lctx) {
+ List<Operator<? extends OperatorDesc>> parentOperators = jop.getParentOperators();
+ JoinDesc jd = jop.getConf();
+ ExprNodeDesc [][] joinKeys = jd.getJoinKeys();
+ if (joinKeys == null || parentOperators == null || parentOperators.size() < 2) {
+ return null;
+ }
+ LineageCtx.Index index = lctx.getIndex();
+ for (Operator<? extends OperatorDesc> op: parentOperators) {
+ if (index.getDependencies(op) == null) {
+ return null;
+ }
+ }
+ Predicate cond = new Predicate();
+ JoinCondDesc[] conds = jd.getConds();
+ int parents = parentOperators.size();
+ StringBuilder sb = new StringBuilder("(");
+ for (int i = 0; i < conds.length; i++) {
+ if (i != 0) {
+ sb.append(" AND ");
+ }
+ int left = conds[i].getLeft();
+ int right = conds[i].getRight();
+ if (joinKeys.length < left
+ || joinKeys[left].length == 0
+ || joinKeys.length < right
+ || joinKeys[right].length == 0
+ || parents < left
+ || parents < right) {
+ return null;
+ }
+ ExprNodeDesc expr = joinKeys[left][0];
+ Operator<? extends OperatorDesc> op = parentOperators.get(left);
+ List<Operator<? extends OperatorDesc>> p = op.getParentOperators();
+ if (p == null || p.isEmpty()) {
+ return null;
+ }
+ sb.append(ExprProcFactory.getExprString(op.getSchema(),
+ expr, lctx, p.get(0), cond));
+ sb.append(" = ");
+ expr = joinKeys[right][0];
+ op = parentOperators.get(right);
+ p = op.getParentOperators();
+ if (p == null || p.isEmpty()) {
+ return null;
+ }
+ sb.append(ExprProcFactory.getExprString(op.getSchema(),
+ expr, lctx, p.get(0), cond));
+ }
+ sb.append(")");
+ cond.setExpr(sb.toString());
+ return cond;
+ }
}
/**
@@ -231,7 +307,7 @@ public class OpProcFactory {
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
- // Assert that there is atleast one item in the stack. This should never
+ // Assert that there is at least one item in the stack. This should never
// be called for leafs.
assert(!stack.isEmpty());
@@ -241,6 +317,7 @@ public class OpProcFactory {
boolean isUdtfPath = true;
Operator<? extends OperatorDesc> inpOp = getParent(stack);
ArrayList<ColumnInfo> cols = inpOp.getSchema().getSignature();
+ lCtx.getIndex().copyPredicates(inpOp, op);
if (inpOp instanceof SelectOperator) {
isUdtfPath = false;
@@ -295,11 +372,31 @@ public class OpProcFactory {
// Otherwise we treat this as a normal select operator and look at
// the expressions.
- ArrayList<ColumnInfo> col_infos = sop.getSchema().getSignature();
+ Operator<? extends OperatorDesc> inpOp = getParent(stack);
+ lctx.getIndex().copyPredicates(inpOp, sop);
+
+ RowSchema rs = sop.getSchema();
+ ArrayList<ColumnInfo> col_infos = rs.getSignature();
int cnt = 0;
for(ExprNodeDesc expr : sop.getConf().getColList()) {
- lctx.getIndex().putDependency(sop, col_infos.get(cnt++),
- ExprProcFactory.getExprDependency(lctx, getParent(stack), expr));
+ Dependency dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr);
+ if (dep != null && dep.getExpr() == null && (dep.getBaseCols().isEmpty()
+ || dep.getType() != LineageInfo.DependencyType.SIMPLE)) {
+ dep.setExpr(ExprProcFactory.getExprString(rs, expr, lctx, inpOp, null));
+ }
+ lctx.getIndex().putDependency(sop, col_infos.get(cnt++), dep);
+ }
+
+ Operator<? extends OperatorDesc> op = null;
+ if (!sop.getChildOperators().isEmpty()) {
+ op = sop.getChildOperators().get(0);
+ if (!op.getChildOperators().isEmpty() && op instanceof LimitOperator) {
+ op = op.getChildOperators().get(0);
+ }
+ }
+ if (op == null || (op.getChildOperators().isEmpty()
+ && op instanceof FileSinkOperator)) {
+ lctx.getIndex().setFinalSelectOp(sop);
}
return null;
@@ -319,6 +416,7 @@ public class OpProcFactory {
GroupByOperator gop = (GroupByOperator)nd;
ArrayList<ColumnInfo> col_infos = gop.getSchema().getSignature();
Operator<? extends OperatorDesc> inpOp = getParent(stack);
+ lctx.getIndex().copyPredicates(inpOp, gop);
int cnt = 0;
for(ExprNodeDesc expr : gop.getConf().getKeys()) {
@@ -326,21 +424,64 @@ public class OpProcFactory {
ExprProcFactory.getExprDependency(lctx, inpOp, expr));
}
+ // If this is a reduce side GroupBy operator, check if there is
+ // a corresponding map side one. If so, some expression could have
+ // already been resolved in the map side.
+ boolean reduceSideGop = (inpOp instanceof ReduceSinkOperator)
+ && (Utils.getNthAncestor(stack, 2) instanceof GroupByOperator);
+
+ RowSchema rs = gop.getSchema();
for(AggregationDesc agg : gop.getConf().getAggregators()) {
// Concatenate the dependencies of all the parameters to
// create the new dependency
Dependency dep = new Dependency();
DependencyType new_type = LineageInfo.DependencyType.EXPRESSION;
- // TODO: Get the actual string here.
- dep.setExpr(null);
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
LinkedHashSet<BaseColumnInfo> bci_set = new LinkedHashSet<BaseColumnInfo>();
for(ExprNodeDesc expr : agg.getParameters()) {
+ if (first) {
+ first = false;
+ } else {
+ sb.append(", ");
+ }
Dependency expr_dep = ExprProcFactory.getExprDependency(lctx, inpOp, expr);
- if (expr_dep != null) {
+ if (expr_dep != null && !expr_dep.getBaseCols().isEmpty()) {
new_type = LineageCtx.getNewDependencyType(expr_dep.getType(), new_type);
bci_set.addAll(expr_dep.getBaseCols());
+ if (expr_dep.getType() == LineageInfo.DependencyType.SIMPLE) {
+ BaseColumnInfo col = expr_dep.getBaseCols().get(0);
+ Table t = col.getTabAlias().getTable();
+ if (t != null) {
+ sb.append(t.getDbName()).append(".").append(t.getTableName()).append(".");
+ }
+ sb.append(col.getColumn().getName());
+ }
+ }
+ if (expr_dep == null || expr_dep.getBaseCols().isEmpty()
+ || expr_dep.getType() != LineageInfo.DependencyType.SIMPLE) {
+ sb.append(expr_dep != null && expr_dep.getExpr() != null ? expr_dep.getExpr() :
+ ExprProcFactory.getExprString(rs, expr, lctx, inpOp, null));
}
}
+ String expr = sb.toString();
+ String udafName = agg.getGenericUDAFName();
+ if (!(reduceSideGop && expr.startsWith(udafName))) {
+ sb.setLength(0); // reset the buffer
+ sb.append(udafName);
+ sb.append("(");
+ if (agg.getDistinct()) {
+ sb.append("DISTINCT ");
+ }
+ sb.append(expr);
+ if (first) {
+ // No parameter, count(*)
+ sb.append("*");
+ }
+ sb.append(")");
+ expr = sb.toString();
+ }
+ dep.setExpr(expr);
// If the bci_set is empty, this means that the inputs to this
// aggregate function were all constants (e.g. count(1)). In this case
@@ -390,13 +531,11 @@ public class OpProcFactory {
*/
public static class UnionLineage extends DefaultLineage implements NodeProcessor {
- protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName());
-
@SuppressWarnings("unchecked")
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
- // Assert that there is atleast one item in the stack. This should never
+ // Assert that there is at least one item in the stack. This should never
// be called for leafs.
assert(!stack.isEmpty());
@@ -407,6 +546,7 @@ public class OpProcFactory {
// Get the row schema of the input operator.
// The row schema of the parent operator
Operator<? extends OperatorDesc> inpOp = getParent(stack);
+ lCtx.getIndex().copyPredicates(inpOp, op);
RowSchema rs = op.getSchema();
ArrayList<ColumnInfo> inp_cols = inpOp.getSchema().getSignature();
int cnt = 0;
@@ -425,13 +565,10 @@ public class OpProcFactory {
*/
public static class ReduceSinkLineage implements NodeProcessor {
- protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName());
-
- @SuppressWarnings("unchecked")
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
- // Assert that there is atleast one item in the stack. This should never
+ // Assert that there is at least one item in the stack. This should never
// be called for leafs.
assert(!stack.isEmpty());
@@ -440,6 +577,7 @@ public class OpProcFactory {
ReduceSinkOperator rop = (ReduceSinkOperator)nd;
Operator<? extends OperatorDesc> inpOp = getParent(stack);
+ lCtx.getIndex().copyPredicates(inpOp, rop);
int cnt = 0;
// The keys are included only in case the reduce sink feeds into
@@ -492,18 +630,56 @@ public class OpProcFactory {
}
/**
+ * Filter processor.
+ */
+ public static class FilterLineage implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ // Assert that there is at least one item in the stack. This should never
+ // be called for leafs.
+ assert(!stack.isEmpty());
+
+ // LineageCtx
+ LineageCtx lCtx = (LineageCtx) procCtx;
+ FilterOperator fop = (FilterOperator)nd;
+
+ // Get the row schema of the input operator.
+ // The row schema of the parent operator
+ Operator<? extends OperatorDesc> inpOp = getParent(stack);
+ lCtx.getIndex().copyPredicates(inpOp, fop);
+ FilterDesc filterDesc = fop.getConf();
+ RowSchema rs = fop.getSchema();
+ if (!filterDesc.isGenerated()) {
+ Predicate cond = new Predicate();
+ cond.setExpr(ExprProcFactory.getExprString(
+ rs, filterDesc.getPredicate(), lCtx, inpOp, cond));
+ lCtx.getIndex().addPredicate(fop, cond);
+ }
+
+ ArrayList<ColumnInfo> inp_cols = inpOp.getSchema().getSignature();
+ int cnt = 0;
+ for(ColumnInfo ci : rs.getSignature()) {
+ lCtx.getIndex().putDependency(fop, ci,
+ lCtx.getIndex().getDependency(inpOp, inp_cols.get(cnt++)));
+ }
+
+ return null;
+ }
+ }
+
+ /**
* Default processor. This basically passes the input dependencies as such
* to the output dependencies.
*/
public static class DefaultLineage implements NodeProcessor {
- protected static final Log LOG = LogFactory.getLog(OpProcFactory.class.getName());
-
@SuppressWarnings("unchecked")
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
- // Assert that there is atleast one item in the stack. This should never
+ // Assert that there is at least one item in the stack. This should never
// be called for leafs.
assert(!stack.isEmpty());
@@ -514,6 +690,7 @@ public class OpProcFactory {
// Get the row schema of the input operator.
// The row schema of the parent operator
Operator<? extends OperatorDesc> inpOp = getParent(stack);
+ lCtx.getIndex().copyPredicates(inpOp, op);
RowSchema rs = op.getSchema();
ArrayList<ColumnInfo> inp_cols = inpOp.getSchema().getSignature();
int cnt = 0;
@@ -561,4 +738,7 @@ public class OpProcFactory {
return new DefaultLineage();
}
+ public static NodeProcessor getFilterProc() {
+ return new FilterLineage();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index f41668b..b02374e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -114,8 +114,11 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.Optimizer;
+import org.apache.hadoop.hive.ql.optimizer.Transform;
import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.HiveOpConverterPostProc;
+import org.apache.hadoop.hive.ql.optimizer.lineage.Generator;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec.SpecType;
import org.apache.hadoop.hive.ql.parse.CalcitePlanner.ASTSearcher;
@@ -206,7 +209,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.InputFormat;
@@ -2886,8 +2888,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
return input;
}
- Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new FilterDesc(filterPred, false),
+ FilterDesc filterDesc = new FilterDesc(filterPred, false);
+ filterDesc.setGenerated(true);
+ Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(filterDesc,
new RowSchema(inputRR.getColumnInfos()), input), inputRR);
if (LOG.isDebugEnabled()) {
@@ -5394,6 +5397,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
OpParseContext inputCtx = opParseCtx.get(input);
RowResolver inputRR = inputCtx.getRowResolver();
FilterDesc orFilterDesc = new FilterDesc(previous, false);
+ orFilterDesc.setGenerated(true);
selectInput = putOpInsertMap(OperatorFactory.getAndMakeChild(
orFilterDesc, new RowSchema(
@@ -9459,9 +9463,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
LOG.info("No need for sample filter");
ExprNodeDesc samplePredicate = genSamplePredicate(ts, tabBucketCols,
colsEqual, alias, rwsch, qb.getMetaData(), null);
- op = OperatorFactory.getAndMakeChild(new FilterDesc(
- samplePredicate, true, new SampleDesc(ts.getNumerator(), ts
- .getDenominator(), tabBucketCols, true)),
+ FilterDesc filterDesc = new FilterDesc(
+ samplePredicate, true, new SampleDesc(ts.getNumerator(),
+ ts.getDenominator(), tabBucketCols, true));
+ filterDesc.setGenerated(true);
+ op = OperatorFactory.getAndMakeChild(filterDesc,
new RowSchema(rwsch.getColumnInfos()), top);
} else {
// need to add filter
@@ -9469,8 +9475,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
LOG.info("Need sample filter");
ExprNodeDesc samplePredicate = genSamplePredicate(ts, tabBucketCols,
colsEqual, alias, rwsch, qb.getMetaData(), null);
- op = OperatorFactory.getAndMakeChild(new FilterDesc(
- samplePredicate, true),
+ FilterDesc filterDesc = new FilterDesc(samplePredicate, true);
+ filterDesc.setGenerated(true);
+ op = OperatorFactory.getAndMakeChild(filterDesc,
new RowSchema(rwsch.getColumnInfos()), top);
}
} else {
@@ -9499,11 +9506,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
qb.getParseInfo().setTabSample(alias, tsSample);
ExprNodeDesc samplePred = genSamplePredicate(tsSample, tab
.getBucketCols(), true, alias, rwsch, qb.getMetaData(), null);
- op = OperatorFactory
- .getAndMakeChild(new FilterDesc(samplePred, true,
- new SampleDesc(tsSample.getNumerator(), tsSample
- .getDenominator(), tab.getBucketCols(), true)),
- new RowSchema(rwsch.getColumnInfos()), top);
+ FilterDesc filterDesc = new FilterDesc(samplePred, true,
+ new SampleDesc(tsSample.getNumerator(), tsSample
+ .getDenominator(), tab.getBucketCols(), true));
+ filterDesc.setGenerated(true);
+ op = OperatorFactory.getAndMakeChild(filterDesc,
+ new RowSchema(rwsch.getColumnInfos()), top);
LOG.info("No need for sample filter");
} else {
// The table is not bucketed, add a dummy filter :: rand()
@@ -9517,8 +9525,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
.valueOf(460476415)));
ExprNodeDesc samplePred = genSamplePredicate(tsSample, null, false,
alias, rwsch, qb.getMetaData(), randFunc);
- op = OperatorFactory.getAndMakeChild(new FilterDesc(
- samplePred, true),
+ FilterDesc filterDesc = new FilterDesc(samplePred, true);
+ filterDesc.setGenerated(true);
+ op = OperatorFactory.getAndMakeChild(filterDesc,
new RowSchema(rwsch.getColumnInfos()), top);
}
}
@@ -10150,6 +10159,18 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
} catch (HiveException e) {
throw new SemanticException(e);
}
+
+ // Generate lineage info for create view statements
+ // if LineageLogger hook is configured.
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.POSTEXECHOOKS).contains(
+ "org.apache.hadoop.hive.ql.hooks.LineageLogger")) {
+ ArrayList<Transform> transformations = new ArrayList<Transform>();
+ transformations.add(new HiveOpConverterPostProc());
+ transformations.add(new Generator());
+ for (Transform t : transformations) {
+ pCtx = t.transform(pCtx);
+ }
+ }
return;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
index 3a1a4af..5408dc8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FilterDesc.java
@@ -82,6 +82,7 @@ public class FilterDesc extends AbstractOperatorDesc {
private transient SampleDesc sampleDescr;
//Is this a filter that should perform a comparison for sorted searches
private boolean isSortedFilter;
+ private transient boolean isGenerated;
public FilterDesc() {
}
@@ -148,6 +149,19 @@ public class FilterDesc extends AbstractOperatorDesc {
this.isSortedFilter = isSortedFilter;
}
+ /**
+ * Some filters are generated or implied, which means it is not in the query.
+ * It is added by the analyzer. For example, when we do an inner join, we add
+ * filters to exclude those rows with null join key values.
+ */
+ public boolean isGenerated() {
+ return isGenerated;
+ }
+
+ public void setGenerated(boolean isGenerated) {
+ this.isGenerated = isGenerated;
+ }
+
@Override
public Object clone() {
FilterDesc filterDesc = new FilterDesc(getPredicate().clone(), getIsSamplingPred());
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
index 3a4ea2f..64eed68 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
@@ -309,6 +309,10 @@ public class JoinDesc extends AbstractOperatorDesc {
return l;
}
+ public ExprNodeDesc [][] getJoinKeys() {
+ return joinKeys;
+ }
+
public JoinCondDesc[] getConds() {
return conds;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
index e716ed2..223f0ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
@@ -62,6 +62,7 @@ public class LineageState {
public LineageState() {
dirToFop = new HashMap<Path, FileSinkOperator>();
linfo = new LineageInfo();
+ index = new Index();
}
/**
@@ -109,12 +110,12 @@ public class LineageState {
}
/**
- * Sets the index for the lineage state.
+ * Gets the index for the lineage state.
*
- * @param index The index derived from lineage context.
+ * @return Index.
*/
- public void setIndex(Index index) {
- this.index = index;
+ public Index getIndex() {
+ return index;
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
index e1cab79..f0435cb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestUpdateDeleteSemanticAnalyzer.java
@@ -280,7 +280,7 @@ public class TestUpdateDeleteSemanticAnalyzer {
// validate the plan
sem.validate();
- QueryPlan plan = new QueryPlan(query, sem, 0L, testName, null);
+ QueryPlan plan = new QueryPlan(query, sem, 0L, testName, null, null);
return new ReturnInfo(tree, sem, plan);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/cdd1c7bf/ql/src/test/queries/clientpositive/lineage2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/lineage2.q b/ql/src/test/queries/clientpositive/lineage2.q
new file mode 100644
index 0000000..6bcd1d7
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/lineage2.q
@@ -0,0 +1,116 @@
+set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.LineageLogger;
+
+drop table if exists src2;
+create table src2 as select key key2, value value2 from src1;
+
+select * from src1 where key is not null and value is not null limit 3;
+select * from src1 where key > 10 and value > 'val' order by key limit 5;
+
+drop table if exists dest1;
+create table dest1 as select * from src1;
+insert into table dest1 select * from src2;
+
+select key k, dest1.value from dest1;
+select key from src1 union select key2 from src2 order by key;
+select key k from src1 union select key2 from src2 order by k;
+
+select key, count(1) a from dest1 group by key;
+select key k, count(*) from dest1 group by key;
+select key k, count(value) from dest1 group by key;
+select value, max(length(key)) from dest1 group by value;
+select value, max(length(key)) from dest1 group by value order by value limit 5;
+
+select key, length(value) from dest1;
+select length(value) + 3 from dest1;
+select 5 from dest1;
+select 3 * 5 from dest1;
+
+drop table if exists dest2;
+create table dest2 as select * from src1 JOIN src2 ON src1.key = src2.key2;
+insert overwrite table dest2 select * from src1 JOIN src2 ON src1.key = src2.key2;
+insert into table dest2 select * from src1 JOIN src2 ON src1.key = src2.key2;
+insert into table dest2
+ select * from src1 JOIN src2 ON length(src1.value) = length(src2.value2) + 1;
+
+select * from src1 where length(key) > 2;
+select * from src1 where length(key) > 2 and value > 'a';
+
+drop table if exists dest3;
+create table dest3 as
+ select * from src1 JOIN src2 ON src1.key = src2.key2 WHERE length(key) > 1;
+insert overwrite table dest2
+ select * from src1 JOIN src2 ON src1.key = src2.key2 WHERE length(key) > 3;
+
+drop table if exists dest_l1;
+CREATE TABLE dest_l1(key INT, value STRING) STORED AS TEXTFILE;
+
+INSERT OVERWRITE TABLE dest_l1
+SELECT j.*
+FROM (SELECT t1.key, p1.value
+ FROM src1 t1
+ LEFT OUTER JOIN src p1
+ ON (t1.key = p1.key)
+ UNION ALL
+ SELECT t2.key, p2.value
+ FROM src1 t2
+ LEFT OUTER JOIN src p2
+ ON (t2.key = p2.key)) j;
+
+drop table if exists emp;
+drop table if exists dept;
+drop table if exists project;
+drop table if exists tgt;
+create table emp(emp_id int, name string, mgr_id int, dept_id int);
+create table dept(dept_id int, dept_name string);
+create table project(project_id int, project_name string);
+create table tgt(dept_name string, name string,
+ emp_id int, mgr_id int, proj_id int, proj_name string);
+
+INSERT INTO TABLE tgt
+SELECT emd.dept_name, emd.name, emd.emp_id, emd.mgr_id, p.project_id, p.project_name
+FROM (
+ SELECT d.dept_name, em.name, em.emp_id, em.mgr_id, em.dept_id
+ FROM (
+ SELECT e.name, e.dept_id, e.emp_id emp_id, m.emp_id mgr_id
+ FROM emp e JOIN emp m ON e.emp_id = m.emp_id
+ ) em
+ JOIN dept d ON d.dept_id = em.dept_id
+ ) emd JOIN project p ON emd.dept_id = p.project_id;
+
+drop table if exists dest_l2;
+create table dest_l2 (id int, c1 tinyint, c2 int, c3 bigint) stored as textfile;
+insert into dest_l2 values(0, 1, 100, 10000);
+
+select * from (
+ select c1 + c2 x from dest_l2
+ union all
+ select sum(c3) y from (select c3 from dest_l2) v1) v2 order by x;
+
+drop table if exists dest_l3;
+create table dest_l3 (id int, c1 string, c2 string, c3 int) stored as textfile;
+insert into dest_l3 values(0, "s1", "s2", 15);
+
+select sum(a.c1) over (partition by a.c1 order by a.id)
+from dest_l2 a
+where a.c2 != 10
+group by a.c1, a.c2, a.id
+having count(a.c2) > 0;
+
+select sum(a.c1), count(b.c1), b.c2, b.c3
+from dest_l2 a join dest_l3 b on (a.id = b.id)
+where a.c2 != 10 and b.c3 > 0
+group by a.c1, a.c2, a.id, b.c1, b.c2, b.c3
+having count(a.c2) > 0
+order by b.c3 limit 5;
+
+drop table if exists t;
+create table t as
+select distinct a.c2, a.c3 from dest_l2 a
+inner join dest_l3 b on (a.id = b.id)
+where a.id > 0 and b.c3 = 15;
+
+SELECT substr(src1.key,1,1), count(DISTINCT substr(src1.value,5)),
+concat(substr(src1.key,1,1),sum(substr(src1.value,5)))
+from src1
+GROUP BY substr(src1.key,1,1);
+