You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2017/12/18 23:25:55 UTC

[12/50] [abbrv] hive git commit: HIVE-18054: Make Lineage work with concurrent queries on a Session (Andrew Sherman, reviewed by Sahil Takiar)

HIVE-18054: Make Lineage work with concurrent queries on a Session (Andrew Sherman, reviewed by Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/646ccce8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/646ccce8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/646ccce8

Branch: refs/heads/standalone-metastore
Commit: 646ccce8ea3e8c944be164f86dbd5d3428bdbc44
Parents: f52e8b4
Author: Andrew Sherman <as...@cloudera.com>
Authored: Sat Dec 16 15:14:54 2017 -0600
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Sat Dec 16 15:24:11 2017 -0600

----------------------------------------------------------------------
 .../java/org/apache/hive/jdbc/ReadableHook.java |  52 +++++++++
 .../apache/hive/jdbc/TestJdbcWithMiniHS2.java   | 114 +++++++++++++++++++
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  46 ++++++--
 .../org/apache/hadoop/hive/ql/QueryState.java   |  35 +++++-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |   6 +-
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |   9 +-
 .../org/apache/hadoop/hive/ql/exec/Task.java    |   4 +
 .../bootstrap/load/table/LoadPartitions.java    |   3 +-
 .../repl/bootstrap/load/table/LoadTable.java    |   3 +-
 .../hadoop/hive/ql/hooks/HookContext.java       |   9 +-
 .../hadoop/hive/ql/hooks/LineageLogger.java     |  56 +++++++--
 .../hive/ql/index/AggregateIndexHandler.java    |   7 +-
 .../hadoop/hive/ql/index/HiveIndexHandler.java  |   6 +-
 .../hive/ql/index/TableBasedIndexHandler.java   |  18 ++-
 .../ql/index/bitmap/BitmapIndexHandler.java     |   8 +-
 .../ql/index/compact/CompactIndexHandler.java   |   8 +-
 .../hive/ql/optimizer/GenMRFileSink1.java       |   2 +-
 .../hive/ql/optimizer/GenMapRedUtils.java       |  27 +++--
 .../hadoop/hive/ql/optimizer/IndexUtils.java    |   6 +-
 .../hive/ql/optimizer/lineage/Generator.java    |   8 +-
 .../hive/ql/parse/DDLSemanticAnalyzer.java      |  15 ++-
 .../hive/ql/parse/ExplainSemanticAnalyzer.java  |   2 +-
 .../hadoop/hive/ql/parse/GenTezUtils.java       |   3 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |   5 +-
 .../hadoop/hive/ql/parse/IndexUpdater.java      |   9 +-
 .../hive/ql/parse/LoadSemanticAnalyzer.java     |   2 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   |   4 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   8 +-
 .../hadoop/hive/ql/parse/TaskCompiler.java      |  11 +-
 .../hive/ql/parse/spark/GenSparkUtils.java      |   2 +-
 .../apache/hadoop/hive/ql/plan/MoveWork.java    |  25 +---
 .../hadoop/hive/ql/session/LineageState.java    |   2 +-
 .../hadoop/hive/ql/session/SessionState.java    |  15 ---
 ...TestGenMapRedUtilsCreateConditionalTask.java |  18 ++-
 34 files changed, 403 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java
new file mode 100644
index 0000000..2dd283f
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.jdbc;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+
+/**
+ * An ExecuteWithHookContext that stores HookContexts in memory and makes them available for reading
+ */
+public class ReadableHook implements ExecuteWithHookContext {
+
+  private static List<HookContext> hookList = Collections.synchronizedList(new ArrayList<>());
+
+  @Override
+  public void run(HookContext hookContext) throws Exception {
+    hookList.add(hookContext);
+  }
+
+  /**
+   * @return the stored HookContexts.
+   */
+  public static List<HookContext> getHookList() {
+    return hookList;
+  }
+
+  /**
+   * Clear the stored HookContexts.
+   */
+  public static void clear() {
+    hookList.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index 70bd29c..ffeee69 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -40,6 +40,7 @@ import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -47,6 +48,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
@@ -64,8 +66,12 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.hooks.LineageLogger;
+import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx;
 import org.apache.hive.common.util.ReflectionUtil;
 import org.apache.hive.jdbc.miniHS2.MiniHS2;
 import org.apache.hive.service.cli.HiveSQLException;
@@ -205,6 +211,9 @@ public class TestJdbcWithMiniHS2 {
     conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
     conf.setBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED, false);
     conf.setBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER, false);
+    // store post-exec hooks calls so we can look at them later
+    conf.setVar(ConfVars.POSTEXECHOOKS, ReadableHook.class.getName() + "," +
+        LineageLogger.class.getName());
     MiniHS2.Builder builder = new MiniHS2.Builder().withConf(conf).cleanupLocalDirOnStartup(false);
     if (httpMode) {
       builder = builder.withHTTPTransport();
@@ -1503,4 +1512,109 @@ public class TestJdbcWithMiniHS2 {
     stmt.close();
     fsConn.close();
   }
+
+  /**
+   * A test that checks that Lineage is correct when a multiple concurrent
+   * requests are make on a connection
+   */
+  @Test
+  public void testConcurrentLineage() throws Exception {
+    // setup to run concurrent operations
+    Statement stmt = conTestDb.createStatement();
+    setSerializeInTasksInConf(stmt);
+    stmt.execute("drop table if exists testConcurrentLineage1");
+    stmt.execute("drop table if exists testConcurrentLineage2");
+    stmt.execute("create table testConcurrentLineage1 (col1 int)");
+    stmt.execute("create table testConcurrentLineage2 (col2 int)");
+
+    // clear vertices list
+    ReadableHook.clear();
+
+    // run 5 sql inserts concurrently
+    int numThreads = 5;        // set to 1 for single threading
+    int concurrentCalls = 5;
+    ExecutorService pool = Executors.newFixedThreadPool(numThreads);
+    try {
+      List<InsertCallable> tasks = new ArrayList<>();
+      for (int i = 0; i < concurrentCalls; i++) {
+        InsertCallable runner = new InsertCallable(conTestDb);
+        tasks.add(runner);
+      }
+      List<Future<Void>> futures = pool.invokeAll(tasks);
+      for (Future<Void> future : futures) {
+        future.get(20, TimeUnit.SECONDS);
+      }
+      // check to see that the vertices are correct
+      checkVertices();
+    } finally {
+      // clean up
+      stmt.execute("drop table testConcurrentLineage1");
+      stmt.execute("drop table testConcurrentLineage2");
+      stmt.close();
+      pool.shutdownNow();
+    }
+  }
+
+  /**
+   * A Callable that does 2 inserts
+   */
+  private class InsertCallable implements Callable<Void> {
+    private Connection connection;
+
+    InsertCallable(Connection conn) {
+      this.connection = conn;
+    }
+
+    @Override public Void call() throws Exception {
+      doLineageInserts(connection);
+      return null;
+    }
+
+    private void doLineageInserts(Connection connection) throws SQLException {
+      Statement stmt = connection.createStatement();
+      stmt.execute("insert into testConcurrentLineage1 values (1)");
+      stmt.execute("insert into testConcurrentLineage2 values (2)");
+    }
+  }
+  /**
+   * check to see that the vertices derived from the HookContexts are correct
+   */
+  private void checkVertices() {
+    List<Set<LineageLogger.Vertex>> verticesLists = getVerticesFromHooks();
+
+    assertEquals("5 runs of 2 inserts makes 10", 10, verticesLists.size());
+    for (Set<LineageLogger.Vertex> vertices : verticesLists) {
+      assertFalse("Each insert affects a column so should be some vertices",
+          vertices.isEmpty());
+      assertEquals("Each insert affects one column so should be one vertex",
+          1, vertices.size());
+      Iterator<LineageLogger.Vertex> iterator = vertices.iterator();
+      assertTrue(iterator.hasNext());
+      LineageLogger.Vertex vertex = iterator.next();
+      assertEquals(0, vertex.getId());
+      assertEquals(LineageLogger.Vertex.Type.COLUMN, vertex.getType());
+      String label = vertex.getLabel();
+      System.out.println("vertex.getLabel() = " + label);
+      assertTrue("did not see one of the 2 expected column names",
+          label.equals("testjdbcminihs2.testconcurrentlineage1.col1") ||
+              label.equals("testjdbcminihs2.testconcurrentlineage2.col2"));
+    }
+  }
+
+  /**
+   * Use the logic in LineageLogger to get vertices from Hook Contexts
+   */
+  private List<Set<LineageLogger.Vertex>>  getVerticesFromHooks() {
+    List<Set<LineageLogger.Vertex>> verticesLists = new ArrayList<>();
+    List<HookContext> hookList = ReadableHook.getHookList();
+    for (HookContext hookContext : hookList) {
+      QueryPlan plan = hookContext.getQueryPlan();
+      LineageCtx.Index index = hookContext.getIndex();
+      assertNotNull(index);
+      List<LineageLogger.Edge> edges = LineageLogger.getEdges(plan, index);
+      Set<LineageLogger.Vertex> vertices = LineageLogger.getVertices(edges);
+      verticesLists.add(vertices);
+    }
+    return verticesLists;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index d3df015..b168906 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -112,6 +112,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivObjectActionType;
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
+import org.apache.hadoop.hive.ql.session.LineageState;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.ql.wm.WmContext;
@@ -374,12 +375,20 @@ public class Driver implements CommandProcessor {
     this(getNewQueryState(conf), null);
   }
 
+  // Pass lineageState when a driver instantiates another Driver to run
+  // or compile another query
+  public Driver(HiveConf conf, LineageState lineageState) {
+    this(getNewQueryState(conf, lineageState), null);
+  }
+
   public Driver(HiveConf conf, HiveTxnManager txnMgr) {
     this(getNewQueryState(conf), null, null, txnMgr);
   }
 
-  public Driver(HiveConf conf, Context ctx) {
-    this(getNewQueryState(conf), null, null);
+  // Pass lineageState when a driver instantiates another Driver to run
+  // or compile another query
+  public Driver(HiveConf conf, Context ctx, LineageState lineageState) {
+    this(getNewQueryState(conf, lineageState), null, null);
     this.ctx = ctx;
   }
 
@@ -387,6 +396,12 @@ public class Driver implements CommandProcessor {
     this(getNewQueryState(conf), userName, null);
   }
 
+  // Pass lineageState when a driver instantiates another Driver to run
+  // or compile another query
+  public Driver(HiveConf conf, String userName, LineageState lineageState) {
+    this(getNewQueryState(conf, lineageState), userName, null);
+  }
+
   public Driver(QueryState queryState, String userName) {
     this(queryState, userName, new HooksLoader(queryState.getConf()), null, null);
   }
@@ -425,6 +440,20 @@ public class Driver implements CommandProcessor {
   }
 
   /**
+   * Generating the new QueryState object. Making sure, that the new queryId is generated.
+   * @param conf The HiveConf which should be used
+   * @param lineageState a LineageState to be set in the new QueryState object
+   * @return The new QueryState object
+   */
+  private static QueryState getNewQueryState(HiveConf conf, LineageState lineageState) {
+    return new QueryState.Builder()
+        .withGenerateNewQueryId(true)
+        .withHiveConf(conf)
+        .withLineageState(lineageState)
+        .build();
+  }
+
+  /**
    * Compile a new query. Any currently-planned query associated with this Driver is discarded.
    * Do not reset id for inner queries(index, etc). Task ids are used for task identity check.
    *
@@ -1336,9 +1365,6 @@ public class Driver implements CommandProcessor {
   private void releaseResources() {
     releasePlan();
     releaseDriverContext();
-    if (SessionState.get() != null) {
-      SessionState.get().getLineageState().clear();
-    }
   }
 
   @Override
@@ -2404,9 +2430,6 @@ public class Driver implements CommandProcessor {
     releaseFetchTask();
     releaseResStream();
     releaseContext();
-    if (SessionState.get() != null) {
-      SessionState.get().getLineageState().clear();
-    }
     if(destroyed) {
       if (!hiveLocks.isEmpty()) {
         try {
@@ -2440,9 +2463,6 @@ public class Driver implements CommandProcessor {
       lDrvState.stateLock.unlock();
       LockedDriverState.removeLockedDriverState();
     }
-    if (SessionState.get() != null) {
-      SessionState.get().getLineageState().clear();
-    }
     return 0;
   }
 
@@ -2504,4 +2524,8 @@ public class Driver implements CommandProcessor {
     releaseResources();
     this.queryState = getNewQueryState(queryState.getConf());
   }
+
+  public QueryState getQueryState() {
+    return queryState;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
index f3a46db..4f0c165 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.session.LineageState;
 
 /**
  * The class to store query level info such as queryId. Multiple queries can run
@@ -40,12 +41,17 @@ public class QueryState {
   private HiveOperation commandType;
 
   /**
+   * Per-query Lineage state to track what happens in the query
+   */
+  private LineageState lineageState = new LineageState();
+
+  /**
    * transaction manager used in the query.
    */
   private HiveTxnManager txnManager;
 
   /**
-   * Private constructor, use QueryState.Builder instead
+   * Private constructor, use QueryState.Builder instead.
    * @param conf The query specific configuration object
    */
   private QueryState(HiveConf conf) {
@@ -79,6 +85,14 @@ public class QueryState {
     return queryConf;
   }
 
+  public LineageState getLineageState() {
+    return lineageState;
+  }
+
+  public void setLineageState(LineageState lineageState) {
+    this.lineageState = lineageState;
+  }
+
   public HiveTxnManager getTxnManager() {
     return txnManager;
   }
@@ -95,9 +109,10 @@ public class QueryState {
     private boolean runAsync = false;
     private boolean generateNewQueryId = false;
     private HiveConf hiveConf = null;
+    private LineageState lineageState = null;
 
     /**
-     * Default constructor - use this builder to create a QueryState object
+     * Default constructor - use this builder to create a QueryState object.
      */
     public Builder() {
     }
@@ -149,6 +164,16 @@ public class QueryState {
     }
 
     /**
+     * add a LineageState that will be set in the built QueryState
+     * @param lineageState the source lineageState
+     * @return the builder
+     */
+    public Builder withLineageState(LineageState lineageState) {
+      this.lineageState = lineageState;
+      return this;
+    }
+
+    /**
      * Creates the QueryState object. The default values are:
      * - runAsync false
      * - confOverlay null
@@ -184,7 +209,11 @@ public class QueryState {
         queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, QueryPlan.makeQueryId());
       }
 
-      return new QueryState(queryConf);
+      QueryState queryState = new QueryState(queryConf);
+      if (lineageState != null) {
+        queryState.setLineageState(lineageState);
+      }
+      return queryState;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 55ef8de..05041cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4478,7 +4478,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       }
     }
     // Don't set inputs and outputs - the locks have already been taken so it's pointless.
-    MoveWork mw = new MoveWork(null, null, null, null, false, SessionState.get().getLineageState());
+    MoveWork mw = new MoveWork(null, null, null, null, false);
     mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null));
     ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), mmWriteId, stmtId);
     Task<?> mv = TaskFactory.get(mw, conf), ic = TaskFactory.get(icw, conf);
@@ -4909,7 +4909,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
         Table createdTable = db.getTable(tbl.getDbName(), tbl.getTableName());
         if (crtTbl.isCTAS()) {
           DataContainer dc = new DataContainer(createdTable.getTTable());
-          SessionState.get().getLineageState().setLineage(
+          queryState.getLineageState().setLineage(
                   createdTable.getPath(), dc, createdTable.getCols()
           );
         }
@@ -5137,7 +5137,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
 
       //set lineage info
       DataContainer dc = new DataContainer(tbl.getTTable());
-      SessionState.get().getLineageState().setLineage(new Path(crtView.getViewName()), dc, tbl.getCols());
+      queryState.getLineageState().setLineage(new Path(crtView.getViewName()), dc, tbl.getCols());
     }
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index f5a5e71..8387208 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -398,7 +398,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
             dc = handleStaticParts(db, table, tbd, ti);
           }
         }
-        if (work.getLineagState() != null && dc != null) {
+        if (dc != null) {
           // If we are doing an update or a delete the number of columns in the table will not
           // match the number of columns in the file sink.  For update there will be one too many
           // (because of the ROW__ID), and in the case of the delete there will be just the
@@ -416,7 +416,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
               tableCols = table.getCols();
               break;
           }
-          work.getLineagState().setLineage(tbd.getSourcePath(), dc, tableCols);
+          queryState.getLineageState().setLineage(tbd.getSourcePath(), dc, tableCols);
         }
         releaseLocks(tbd);
       }
@@ -552,10 +552,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
       dc = new DataContainer(table.getTTable(), partn.getTPartition());
 
       // Don't set lineage on delete as we don't have all the columns
-      if (work.getLineagState() != null &&
-          work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
+      if (work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
           work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) {
-        work.getLineagState().setLineage(tbd.getSourcePath(), dc,
+        queryState.getLineageState().setLineage(tbd.getSourcePath(), dc,
             table.getCols());
       }
       LOG.info("Loading partition " + entry.getKey());

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 1f0487f..d75fcf7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -649,4 +649,8 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
     return true;
   }
 
+  public QueryState getQueryState() {
+    return queryState;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 262225f..1a542e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -245,8 +245,7 @@ public class LoadPartitions {
         SessionState.get().getTxnMgr().getCurrentTxnId()
     );
     loadTableWork.setInheritTableSpecs(false);
-    MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false,
-        context.sessionStateLineageState);
+    MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false);
     return TaskFactory.get(work, context.hiveConf);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 545b7a8..f5125a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -233,8 +233,7 @@ public class LoadTable {
         SessionState.get().getTxnMgr().getCurrentTxnId()
     );
     MoveWork moveWork =
-        new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false,
-            context.sessionStateLineageState);
+        new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false);
     Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf);
     copyTask.addDependentTask(loadTableTask);
     return copyTask;

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
index 7b61730..93f1da7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner;
 import org.apache.hadoop.hive.ql.history.HiveHistory;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -88,12 +87,8 @@ public class HookContext {
     inputs = queryPlan.getInputs();
     outputs = queryPlan.getOutputs();
     ugi = Utils.getUGI();
-    linfo= null;
-    depMap = null;
-    if(SessionState.get() != null){
-      linfo = SessionState.get().getLineageState().getLineageInfo();
-      depMap = SessionState.get().getLineageState().getIndex();
-    }
+    linfo = queryState.getLineageState().getLineageInfo();
+    depMap = queryState.getLineageState().getIndex();
     this.userName = userName;
     this.ipAddress = ipAddress;
     this.hiveInstanceAddress = hiveInstanceAddress;

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
index 2f764f8..06eb9c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.hooks;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
@@ -74,7 +75,15 @@ public class LineageLogger implements ExecuteWithHookContext {
 
   private static final String FORMAT_VERSION = "1.0";
 
-  final static class Edge {
+  /**
+   * An edge in lineage.
+   */
+  @VisibleForTesting
+  public static final class Edge {
+
+    /**
+     * The types of Edge.
+     */
     public static enum Type {
       PROJECTION, PREDICATE
     }
@@ -92,7 +101,15 @@ public class LineageLogger implements ExecuteWithHookContext {
     }
   }
 
-  final static class Vertex {
+  /**
+   * A vertex in lineage.
+   */
+  @VisibleForTesting
+  public static final class Vertex {
+
+    /**
+     * A type in lineage.
+     */
     public static enum Type {
       COLUMN, TABLE
     }
@@ -125,6 +142,21 @@ public class LineageLogger implements ExecuteWithHookContext {
       Vertex vertex = (Vertex) obj;
       return label.equals(vertex.label) && type == vertex.type;
     }
+
+    @VisibleForTesting
+    public Type getType() {
+      return type;
+    }
+
+    @VisibleForTesting
+    public String getLabel() {
+      return label;
+    }
+
+    @VisibleForTesting
+    public int getId() {
+      return id;
+    }
   }
 
   @Override
@@ -203,7 +235,7 @@ public class LineageLogger implements ExecuteWithHookContext {
   /**
    * Logger an error to console if available.
    */
-  private void log(String error) {
+  private static void log(String error) {
     LogHelper console = SessionState.getConsole();
     if (console != null) {
       console.printError(error);
@@ -214,7 +246,8 @@ public class LineageLogger implements ExecuteWithHookContext {
    * Based on the final select operator, find out all the target columns.
    * For each target column, find out its sources based on the dependency index.
    */
-  private List<Edge> getEdges(QueryPlan plan, Index index) {
+  @VisibleForTesting
+  public static List<Edge> getEdges(QueryPlan plan, Index index) {
     LinkedHashMap<String, ObjectPair<SelectOperator,
       org.apache.hadoop.hive.ql.metadata.Table>> finalSelOps = index.getFinalSelectOps();
     Map<String, Vertex> vertexCache = new LinkedHashMap<String, Vertex>();
@@ -292,7 +325,7 @@ public class LineageLogger implements ExecuteWithHookContext {
     return edges;
   }
 
-  private void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges,
+  private static void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges,
       Set<BaseColumnInfo> srcCols, Vertex target, String expr, Edge.Type type) {
     Set<Vertex> targets = new LinkedHashSet<Vertex>();
     targets.add(target);
@@ -304,7 +337,7 @@ public class LineageLogger implements ExecuteWithHookContext {
    * If found, add the more targets to this edge's target vertex list.
    * Otherwise, create a new edge and add to edge list.
    */
-  private void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges,
+  private static void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges,
       Set<BaseColumnInfo> srcCols, Set<Vertex> targets, String expr, Edge.Type type) {
     Set<Vertex> sources = createSourceVertices(vertexCache, srcCols);
     Edge edge = findSimilarEdgeBySources(edges, sources, expr, type);
@@ -319,7 +352,7 @@ public class LineageLogger implements ExecuteWithHookContext {
    * Convert a list of columns to a set of vertices.
    * Use cached vertices if possible.
    */
-  private Set<Vertex> createSourceVertices(
+  private static Set<Vertex> createSourceVertices(
       Map<String, Vertex> vertexCache, Collection<BaseColumnInfo> baseCols) {
     Set<Vertex> sources = new LinkedHashSet<Vertex>();
     if (baseCols != null && !baseCols.isEmpty()) {
@@ -346,7 +379,7 @@ public class LineageLogger implements ExecuteWithHookContext {
   /**
    * Find a vertex from a cache, or create one if not.
    */
-  private Vertex getOrCreateVertex(
+  private static Vertex getOrCreateVertex(
       Map<String, Vertex> vertices, String label, Vertex.Type type) {
     Vertex vertex = vertices.get(label);
     if (vertex == null) {
@@ -359,7 +392,7 @@ public class LineageLogger implements ExecuteWithHookContext {
   /**
    * Find an edge that has the same type, expression, and sources.
    */
-  private Edge findSimilarEdgeBySources(
+  private static Edge findSimilarEdgeBySources(
       List<Edge> edges, Set<Vertex> sources, String expr, Edge.Type type) {
     for (Edge edge: edges) {
       if (edge.type == type && StringUtils.equals(edge.expr, expr)
@@ -373,7 +406,7 @@ public class LineageLogger implements ExecuteWithHookContext {
   /**
    * Generate normalized name for a given target column.
    */
-  private String getTargetFieldName(int fieldIndex,
+  private static String getTargetFieldName(int fieldIndex,
       String destTableName, List<String> colNames, List<FieldSchema> fieldSchemas) {
     String fieldName = fieldSchemas.get(fieldIndex).getName();
     String[] parts = fieldName.split("\\.");
@@ -394,7 +427,8 @@ public class LineageLogger implements ExecuteWithHookContext {
    * Get all the vertices of all edges. Targets at first,
    * then sources. Assign id to each vertex.
    */
-  private Set<Vertex> getVertices(List<Edge> edges) {
+  @VisibleForTesting
+  public static Set<Vertex> getVertices(List<Edge> edges) {
     Set<Vertex> vertices = new LinkedHashSet<Vertex>();
     for (Edge edge: edges) {
       vertices.addAll(edge.targets);

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
index 68709b4..bf06723 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-
+import org.apache.hadoop.hive.ql.session.LineageState;
 
 /**
  * Index handler for indexes that have aggregate functions on indexed columns.
@@ -90,7 +90,8 @@ public class AggregateIndexHandler extends CompactIndexHandler {
         Set<WriteEntity> outputs,
         Index index, boolean partitioned,
         PartitionDesc indexTblPartDesc, String indexTableName,
-        PartitionDesc baseTablePartDesc, String baseTableName, String dbName) {
+        PartitionDesc baseTablePartDesc, String baseTableName, String dbName,
+        LineageState lineageState) {
 
       List<FieldSchema> indexField = index.getSd().getCols();
       String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField);
@@ -152,7 +153,7 @@ public class AggregateIndexHandler extends CompactIndexHandler {
       builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false);
       builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false);
       Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
-          command, (LinkedHashMap<String, String>) partSpec, indexTableName, dbName);
+          command, (LinkedHashMap<String, String>) partSpec, indexTableName, dbName, lineageState);
       return rootTask;
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java
index 1e577da..b6c0252 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
 
 /**
  * HiveIndexHandler defines a pluggable interface for adding new index handlers
@@ -99,6 +100,9 @@ public interface HiveIndexHandler extends Configurable {
    *          outputs for hooks, supplemental outputs going
    *          along with the return value
    *
+   * @param lineageState
+   *          tracks Lineage for the query
+   *
    * @return list of tasks to be executed in parallel for building the index
    *
    * @throws HiveException if plan generation fails
@@ -108,7 +112,7 @@ public interface HiveIndexHandler extends Configurable {
       org.apache.hadoop.hive.metastore.api.Index index,
       List<Partition> indexTblPartitions, List<Partition> baseTblPartitions,
       org.apache.hadoop.hive.ql.metadata.Table indexTbl,
-      Set<ReadEntity> inputs, Set<WriteEntity> outputs)
+      Set<ReadEntity> inputs, Set<WriteEntity> outputs,  LineageState lineageState)
       throws HiveException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
index 29886ae..744ac29 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
 
 /**
  * Index handler for indexes that use tables to store indexes.
@@ -51,7 +52,8 @@ public abstract class TableBasedIndexHandler extends AbstractIndexHandler {
       org.apache.hadoop.hive.metastore.api.Index index,
       List<Partition> indexTblPartitions, List<Partition> baseTblPartitions,
       org.apache.hadoop.hive.ql.metadata.Table indexTbl,
-      Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws HiveException {
+      Set<ReadEntity> inputs, Set<WriteEntity> outputs,
+      LineageState lineageState) throws HiveException {
     try {
 
       TableDesc desc = Utilities.getTableDesc(indexTbl);
@@ -66,7 +68,7 @@ public abstract class TableBasedIndexHandler extends AbstractIndexHandler {
         Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, false,
             new PartitionDesc(desc, null), indexTbl.getTableName(),
             new PartitionDesc(Utilities.getTableDesc(baseTbl), null),
-            baseTbl.getTableName(), indexTbl.getDbName());
+            baseTbl.getTableName(), indexTbl.getDbName(), lineageState);
         indexBuilderTasks.add(indexBuilder);
       } else {
 
@@ -89,7 +91,8 @@ public abstract class TableBasedIndexHandler extends AbstractIndexHandler {
           // for each partition, spawn a map reduce task.
           Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, true,
               new PartitionDesc(indexPart), indexTbl.getTableName(),
-              new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName());
+              new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName(),
+              lineageState);
           indexBuilderTasks.add(indexBuilder);
         }
       }
@@ -102,15 +105,18 @@ public abstract class TableBasedIndexHandler extends AbstractIndexHandler {
   protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs,
       Index index, boolean partitioned,
       PartitionDesc indexTblPartDesc, String indexTableName,
-      PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException {
+      PartitionDesc baseTablePartDesc, String baseTableName, String dbName,
+      LineageState lineageState) throws HiveException {
     return getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(),
-        partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName);
+        partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName,
+        lineageState);
   }
 
   protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs,
       List<FieldSchema> indexField, boolean partitioned,
       PartitionDesc indexTblPartDesc, String indexTableName,
-      PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException {
+      PartitionDesc baseTablePartDesc, String baseTableName, String dbName,
+      LineageState lineageState) throws HiveException {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
index 7b067a0..9117159 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
@@ -115,7 +116,7 @@ public class BitmapIndexHandler extends TableBasedIndexHandler {
     LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString());
     HiveConf queryConf = new HiveConf(pctx.getConf(), BitmapIndexHandler.class);
     HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false);
-    Driver driver = new Driver(queryConf);
+    Driver driver = new Driver(queryConf, pctx.getQueryState().getLineageState());
     driver.compile(qlCommand.toString(), false);
 
     queryContext.setIndexIntermediateFile(tmpFile);
@@ -222,7 +223,8 @@ public class BitmapIndexHandler extends TableBasedIndexHandler {
   protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs,
       List<FieldSchema> indexField, boolean partitioned,
       PartitionDesc indexTblPartDesc, String indexTableName,
-      PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException {
+      PartitionDesc baseTablePartDesc, String baseTableName, String dbName,
+      LineageState lineageState) throws HiveException {
 
     HiveConf builderConf = new HiveConf(getConf(), BitmapIndexHandler.class);
     HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEROWOFFSET, true);
@@ -290,7 +292,7 @@ public class BitmapIndexHandler extends TableBasedIndexHandler {
     }
 
     Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
-        command, partSpec, indexTableName, dbName);
+        command, partSpec, indexTableName, dbName, lineageState);
     return rootTask;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
index 504b062..73278cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
@@ -94,7 +95,8 @@ public class CompactIndexHandler extends TableBasedIndexHandler {
   protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs,
       List<FieldSchema> indexField, boolean partitioned,
       PartitionDesc indexTblPartDesc, String indexTableName,
-      PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException {
+      PartitionDesc baseTablePartDesc, String baseTableName, String dbName,
+      LineageState lineageState) throws HiveException {
 
     String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField);
 
@@ -150,7 +152,7 @@ public class CompactIndexHandler extends TableBasedIndexHandler {
     builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false);
     builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false);
     Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
-        command, partSpec, indexTableName, dbName);
+        command, partSpec, indexTableName, dbName, lineageState);
     return rootTask;
   }
 
@@ -189,7 +191,7 @@ public class CompactIndexHandler extends TableBasedIndexHandler {
     LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString());
     HiveConf queryConf = new HiveConf(pctx.getConf(), CompactIndexHandler.class);
     HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false);
-    Driver driver = new Driver(queryConf);
+    Driver driver = new Driver(queryConf, pctx.getQueryState().getLineageState());
     driver.compile(qlCommand.toString(), false);
 
     if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) {

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
index d7a83f7..bb42dde 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
@@ -112,7 +112,7 @@ public class GenMRFileSink1 implements NodeProcessor {
       LOG.info("using CombineHiveInputformat for the merge job");
       GenMapRedUtils.createMRWorkForMergingFiles(fsOp, finalName,
           ctx.getDependencyTaskForMultiInsert(), ctx.getMvTask(),
-          hconf, currTask);
+          hconf, currTask, parseCtx.getQueryState().getLineageState());
     }
 
     FileSinkDesc fileSinkDesc = fsOp.getConf();

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index bdaf105..a0b2678 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1229,6 +1229,7 @@ public final class GenMapRedUtils {
    * @param mvTasks
    * @param conf
    * @param currTask
+   * @param lineageState
    * @throws SemanticException
 
    * create a Map-only merge job using CombineHiveInputFormat for all partitions with
@@ -1257,10 +1258,11 @@ public final class GenMapRedUtils {
    *          directories.
    *
    */
-  public static void createMRWorkForMergingFiles (FileSinkOperator fsInput,
-   Path finalName, DependencyCollectionTask dependencyTask,
-   List<Task<MoveWork>> mvTasks, HiveConf conf,
-   Task<? extends Serializable> currTask) throws SemanticException {
+  public static void createMRWorkForMergingFiles(FileSinkOperator fsInput,
+      Path finalName, DependencyCollectionTask dependencyTask,
+      List<Task<MoveWork>> mvTasks, HiveConf conf,
+      Task<? extends Serializable> currTask, LineageState lineageState)
+      throws SemanticException {
 
     //
     // 1. create the operator tree
@@ -1370,8 +1372,7 @@ public final class GenMapRedUtils {
     if (srcMmWriteId == null) {
       // Only create the movework for non-MM table. No action needed for a MM table.
       dummyMv = new MoveWork(null, null, null,
-          new LoadFileDesc(inputDirName, finalName, true, null, null, false), false,
-          SessionState.get().getLineageState());
+          new LoadFileDesc(inputDirName, finalName, true, null, null, false), false);
     }
     // Use the original fsOp path here in case of MM - while the new FSOP merges files inside the
     // MM directory, the original MoveTask still commits based on the parent. Note that this path
@@ -1382,7 +1383,7 @@ public final class GenMapRedUtils {
     Task<MoveWork> mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(
         mvTasks, fsopPath, fsInputDesc.isMmTable());
     ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work,
-        fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask);
+        fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask, lineageState);
 
     // keep the dynamic partition context in conditional task resolver context
     ConditionalResolverMergeFilesCtx mrCtx =
@@ -1730,15 +1731,16 @@ public final class GenMapRedUtils {
    *
    * @param condInputPath A path that the ConditionalTask uses as input for its sub-tasks.
    * @param linkedMoveWork A MoveWork that the ConditionalTask uses to link to its sub-tasks.
+   * @param lineageState A LineageState used to track what changes.
    * @return A new MoveWork that has the Conditional input path as source and the linkedMoveWork as target.
    */
   @VisibleForTesting
-  protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMoveWork) {
+  protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMoveWork,
+      LineageState lineageState) {
     MoveWork newWork = new MoveWork(linkedMoveWork);
     LoadFileDesc fileDesc = null;
     LoadTableDesc tableDesc = null;
 
-    LineageState lineageState = SessionState.get().getLineageState();
     if (linkedMoveWork.getLoadFileWork() != null) {
       fileDesc = new LoadFileDesc(linkedMoveWork.getLoadFileWork());
       fileDesc.setSourcePath(condInputPath);
@@ -1776,13 +1778,15 @@ public final class GenMapRedUtils {
    *          a MoveTask that may be linked to the conditional sub-tasks
    * @param dependencyTask
    *          a dependency task that may be linked to the conditional sub-tasks
+   * @param lineageState
+   *          to track activity
    * @return The conditional task
    */
   @SuppressWarnings("unchecked")
   private static ConditionalTask createCondTask(HiveConf conf,
       Task<? extends Serializable> currTask, MoveWork mvWork, Serializable mergeWork,
       Path condInputPath, Path condOutputPath, Task<MoveWork> moveTaskToLink,
-      DependencyCollectionTask dependencyTask) {
+      DependencyCollectionTask dependencyTask, LineageState lineageState) {
     if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
       Utilities.FILE_OP_LOGGER.trace("Creating conditional merge task for " + condInputPath);
     }
@@ -1795,7 +1799,8 @@ public final class GenMapRedUtils {
 
     Serializable workForMoveOnlyTask = moveWork;
     if (shouldMergeMovePaths) {
-      workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork());
+      workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork(),
+          lineageState);
     }
 
     // There are 3 options for this ConditionalTask:

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java
index 338c185..f69c9a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.LineageState;
 
 /**
  * Utility class for index support.
@@ -221,10 +222,11 @@ public final class IndexUtils {
       StringBuilder command,
       LinkedHashMap<String, String> partSpec,
       String indexTableName,
-      String dbName){
+      String dbName,
+      LineageState lineageState){
     // Don't try to index optimize the query to build the index
     HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEOPTINDEXFILTER, false);
-    Driver driver = new Driver(builderConf, SessionState.get().getUserName());
+    Driver driver = new Driver(builderConf, SessionState.get().getUserName(), lineageState);
     driver.compile(command.toString(), false);
 
     Task<?> rootTask = driver.getPlan().getRootTasks().get(0);

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
index e6c0771..0d72a1e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.optimizer.Transform;
 import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,9 +84,10 @@ public class Generator extends Transform {
         return pctx;
       }
     }
-
-    Index index = SessionState.get() != null ?
-      SessionState.get().getLineageState().getIndex() : new Index();
+    Index index = pctx.getQueryState().getLineageState().getIndex();
+    if (index == null) {
+      index = new Index();
+    }
 
     long sTime = System.currentTimeMillis();
     // Create the lineage context

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index a09b796..971a061 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -158,6 +158,7 @@ import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
 import org.apache.hadoop.hive.ql.plan.CreateOrAlterWMPoolDesc;
 import org.apache.hadoop.hive.ql.plan.CreateOrDropTriggerToPoolMappingDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -1485,8 +1486,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
             partSpec == null ? new HashMap<>() : partSpec, null);
         ltd.setLbCtx(lbCtx);
         @SuppressWarnings("unchecked")
-        Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(
-          null, null, ltd, null, false, SessionState.get().getLineageState()), conf);
+        Task<MoveWork> moveTsk =
+            TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
         truncateTask.addDependentTask(moveTsk);
 
         // Recalculate the HDFS stats if auto gather stats is set
@@ -1703,8 +1704,10 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
             indexTbl, db, indexTblPartitions);
       }
 
+      LineageState lineageState = queryState.getLineageState();
       List<Task<?>> ret = handler.generateIndexBuildTaskList(baseTbl,
-          index, indexTblPartitions, baseTblPartitions, indexTbl, getInputs(), getOutputs());
+          index, indexTblPartitions, baseTblPartitions, indexTbl, getInputs(), getOutputs(),
+          lineageState);
       return ret;
     } catch (Exception e) {
       throw new SemanticException(e);
@@ -2146,8 +2149,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
       LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
           partSpec == null ? new HashMap<>() : partSpec, null);
       ltd.setLbCtx(lbCtx);
-      Task<MoveWork> moveTsk = TaskFactory.get(
-        new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), conf);
+      Task<MoveWork> moveTsk =
+          TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
       mergeTask.addDependentTask(moveTsk);
 
       if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
@@ -3539,7 +3542,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
       }
       SessionState ss = SessionState.get();
       String uName = (ss == null? null: ss.getUserName());
-      Driver driver = new Driver(conf, uName);
+      Driver driver = new Driver(conf, uName, queryState.getLineageState());
       int rc = driver.compile(cmd.toString(), false);
       if (rc != 0) {
         throw new SemanticException(ErrorMsg.NO_VALID_PARTN.getMsg());

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
index 065c7e5..0eacfc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
@@ -134,7 +134,7 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer {
         runCtx = new Context(conf);
         // runCtx and ctx share the configuration, but not isExplainPlan()
         runCtx.setExplainConfig(config);
-        Driver driver = new Driver(conf, runCtx);
+        Driver driver = new Driver(conf, runCtx, queryState.getLineageState());
         CommandProcessorResponse ret = driver.run(query);
         if(ret.getResponseCode() == 0) {
           // Note that we need to call getResults for simple fetch optimization.

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index b6f1139..e6d4cbe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -385,7 +385,8 @@ public class GenTezUtils {
           + fileSink.getConf().getDirName() + " to " + finalName);
       GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
           context.dependencyTask, context.moveTask,
-          hconf, context.currentTask);
+          hconf, context.currentTask,
+          parseContext.getQueryState().getLineageState());
     }
 
     FetchTask fetchTask = parseContext.getFetchTask();

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 83d53bc..c79df56 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -391,7 +391,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         Utilities.getTableDesc(table), new TreeMap<>(),
         replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId);
     loadTableWork.setStmtId(stmtId);
-    MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState());
+    MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork,
+        null, false);
     Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
     copyTask.addDependentTask(loadTableTask);
     x.getTasks().add(copyTask);
@@ -495,7 +496,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       loadTableWork.setStmtId(stmtId);
       loadTableWork.setInheritTableSpecs(false);
       Task<?> loadPartTask = TaskFactory.get(new MoveWork(
-          x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()), x.getConf());
+          x.getInputs(), x.getOutputs(), loadTableWork, null, false), x.getConf());
       copyTask.addDependentTask(loadPartTask);
       addPartTask.addDependentTask(loadPartTask);
       x.getTasks().add(copyTask);

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
index f31775e..ccf1e66 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
 
 import java.io.Serializable;
 import java.util.LinkedList;
@@ -47,12 +48,14 @@ public class IndexUpdater {
   private Hive hive;
   private List<Task<? extends Serializable>> tasks;
   private Set<ReadEntity> inputs;
+  private LineageState lineageState;
 
-
-  public IndexUpdater(List<LoadTableDesc> loadTableWork, Set<ReadEntity> inputs, Configuration conf) {
+  public IndexUpdater(List<LoadTableDesc> loadTableWork, Set<ReadEntity> inputs, Configuration conf,
+      LineageState lineageState) {
     this.loadTableWork = loadTableWork;
     this.inputs = inputs;
     this.conf = new HiveConf(conf, IndexUpdater.class);
+    this.lineageState = lineageState;
     this.tasks = new LinkedList<Task<? extends Serializable>>();
   }
 
@@ -133,7 +136,7 @@ public class IndexUpdater {
   }
 
   private void compileRebuild(String query) {
-    Driver driver = new Driver(this.conf);
+    Driver driver = new Driver(this.conf, lineageState);
     driver.compile(query, false);
     tasks.addAll(driver.getPlan().getRootTasks());
     inputs.addAll(driver.getPlan().getInputs());

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index cc956da..e600f7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -302,7 +302,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
 
     Task<? extends Serializable> childTask = TaskFactory.get(
         new MoveWork(getInputs(), getOutputs(), loadTableWork, null, true,
-            isLocal, SessionState.get().getLineageState()), conf
+            isLocal), conf
     );
     if (rTask != null) {
       rTask.addDependentTask(childTask);

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 498b674..80556ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -313,7 +313,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
         ReplLoadWork replLoadWork =
             new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern,
-                SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId());
+                queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId());
         rootTasks.add(TaskFactory.get(replLoadWork, conf, true));
         return;
       }
@@ -344,7 +344,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         }
 
         ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
-            SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId());
+            queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId());
         rootTasks.add(TaskFactory.get(replLoadWork, conf, true));
         //
         //        for (FileStatus dir : dirsInLoadPath) {

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 28e3621..dcda8b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7336,8 +7336,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
   private void handleLineage(LoadTableDesc ltd, Operator output)
       throws SemanticException {
-    if (ltd != null && SessionState.get() != null) {
-      SessionState.get().getLineageState()
+    if (ltd != null) {
+      queryState.getLineageState()
           .mapDirToOp(ltd.getSourcePath(), output);
     } else if ( queryState.getCommandType().equals(HiveOperation.CREATETABLE_AS_SELECT.getOperationName())) {
 
@@ -7350,7 +7350,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         throw new SemanticException(e);
       }
 
-      SessionState.get().getLineageState()
+      queryState.getLineageState()
               .mapDirToOp(tlocation, output);
     }
   }
@@ -11685,7 +11685,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
             pCtx = t.transform(pCtx);
           }
           // we just use view name as location.
-          SessionState.get().getLineageState()
+          queryState.getLineageState()
               .mapDirToOp(new Path(createVwDesc.getViewName()), sinkOp);
         }
         return;

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 7b29370..24559b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -105,7 +106,8 @@ public abstract class TaskCompiler {
   }
 
   @SuppressWarnings({"nls", "unchecked"})
-  public void compile(final ParseContext pCtx, final List<Task<? extends Serializable>> rootTasks,
+  public void compile(final ParseContext pCtx,
+      final List<Task<? extends Serializable>> rootTasks,
       final HashSet<ReadEntity> inputs, final HashSet<WriteEntity> outputs) throws SemanticException {
 
     Context ctx = pCtx.getContext();
@@ -218,12 +220,13 @@ public abstract class TaskCompiler {
     } else if (!isCStats) {
       for (LoadTableDesc ltd : loadTableWork) {
         Task<MoveWork> tsk = TaskFactory
-            .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()),
+            .get(new MoveWork(null, null, ltd, null, false),
                 conf);
         mvTask.add(tsk);
         // Check to see if we are stale'ing any indexes and auto-update them if we want
         if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
-          IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf);
+          IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf,
+              queryState.getLineageState());
           try {
             List<Task<? extends Serializable>> indexUpdateTasks = indexUpdater
                 .generateUpdateTasks();
@@ -248,7 +251,7 @@ public abstract class TaskCompiler {
           oneLoadFileForCtas = false;
         }
         mvTask.add(TaskFactory
-            .get(new MoveWork(null, null, null, lfd, false, SessionState.get().getLineageState()),
+            .get(new MoveWork(null, null, null, lfd, false),
                 conf));
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 604c8ae..c6c7bf7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -385,7 +385,7 @@ public class GenSparkUtils {
       LOG.info("using CombineHiveInputformat for the merge job");
       GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
           context.dependencyTask, context.moveTask,
-          hconf, context.currentTask);
+          hconf, context.currentTask, parseContext.getQueryState().getLineageState());
     }
 
     FetchTask fetchTask = parseContext.getFetchTask();

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 28a3374..49fe540 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
-import org.apache.hadoop.hive.ql.session.LineageState;
 
 /**
  * MoveWork.
@@ -39,13 +38,6 @@ public class MoveWork implements Serializable {
   private LoadTableDesc loadTableWork;
   private LoadFileDesc loadFileWork;
   private LoadMultiFilesDesc loadMultiFilesWork;
-  /*
-  these are sessionState objects that are copied over to work to allow for parallel execution.
-  based on the current use case the methods are selectively synchronized, which might need to be
-  taken care when using other methods.
-   */
-  private final LineageState sessionStateLineageState;
-
   private boolean checkFileFormat;
   private boolean srcLocal;
 
@@ -65,21 +57,18 @@ public class MoveWork implements Serializable {
   private boolean isNoop;
 
   public MoveWork() {
-    sessionStateLineageState = null;
   }
 
 
-  private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
-      LineageState lineageState) {
+  private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) {
     this.inputs = inputs;
     this.outputs = outputs;
-    sessionStateLineageState = lineageState;
   }
 
   public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
       final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
-      boolean checkFileFormat, boolean srcLocal, LineageState lineageState) {
-    this(inputs, outputs, lineageState);
+      boolean checkFileFormat, boolean srcLocal) {
+    this(inputs, outputs);
     if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
       Utilities.FILE_OP_LOGGER.trace("Creating MoveWork " + System.identityHashCode(this)
         + " with " + loadTableWork + "; " + loadFileWork);
@@ -92,8 +81,8 @@ public class MoveWork implements Serializable {
 
   public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
       final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
-      boolean checkFileFormat, LineageState lineageState) {
-    this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false, lineageState);
+      boolean checkFileFormat) {
+    this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false);
   }
 
   public MoveWork(final MoveWork o) {
@@ -104,7 +93,6 @@ public class MoveWork implements Serializable {
     srcLocal = o.isSrcLocal();
     inputs = o.getInputs();
     outputs = o.getOutputs();
-    sessionStateLineageState = o.sessionStateLineageState;
   }
 
   @Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
@@ -166,7 +154,4 @@ public class MoveWork implements Serializable {
     this.srcLocal = srcLocal;
   }
   
-  public LineageState getLineagState() {
-    return sessionStateLineageState;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
index 056d614..82eeb35 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
@@ -60,7 +60,7 @@ public class LineageState implements Serializable {
   /**
    * Constructor.
    */
-  LineageState() {
+  public LineageState() {
     dirToFop = new HashMap<>();
     linfo = new LineageInfo();
     index = new Index();

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index bb6ddc6..d03f5e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -232,11 +232,6 @@ public class SessionState {
    */
   private Map<URI, HadoopShims.HdfsEncryptionShim> hdfsEncryptionShims = Maps.newHashMap();
 
-  /**
-   * Lineage state.
-   */
-  LineageState ls;
-
   private final String userName;
 
   /**
@@ -294,15 +289,6 @@ public class SessionState {
 
   private List<Closeable> cleanupItems = new LinkedList<Closeable>();
 
-  /**
-   * Get the lineage state stored in this session.
-   *
-   * @return LineageState
-   */
-  public LineageState getLineageState() {
-    return ls;
-  }
-
   public HiveConf getConf() {
     return sessionConf;
   }
@@ -387,7 +373,6 @@ public class SessionState {
       LOG.debug("SessionState user: " + userName);
     }
     isSilent = conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT);
-    ls = new LineageState();
     resourceMaps = new ResourceMaps();
     // Must be deterministic order map for consistent q-test output across Java versions
     overriddenConfigurations = new LinkedHashMap<String, String>();

http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
index 3406892..3c007a7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -135,9 +136,10 @@ public class TestGenMapRedUtilsCreateConditionalTask {
   public void testMergePathWithInvalidMoveWorkThrowsException() {
     final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000");
     final MoveWork mockWork = mock(MoveWork.class);
+    final LineageState lineageState = new LineageState();
 
     when(mockWork.getLoadMultiFilesWork()).thenReturn(new LoadMultiFilesDesc());
-    GenMapRedUtils.mergeMovePaths(condInputPath, mockWork);
+    GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState);
   }
 
   @Test
@@ -146,12 +148,13 @@ public class TestGenMapRedUtilsCreateConditionalTask {
     final Path condOutputPath = new Path("s3a://bucket/scratch/-ext-10002");
     final Path targetMoveWorkPath = new Path("s3a://bucket/scratch/-ext-10003");
     final MoveWork mockWork = mock(MoveWork.class);
+    final LineageState lineageState = new LineageState();
     MoveWork newWork;
 
     // test using loadFileWork
     when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc(
         condOutputPath, targetMoveWorkPath, false, "", "", false));
-    newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork);
+    newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState);
     assertNotNull(newWork);
     assertNotEquals(newWork, mockWork);
     assertEquals(condInputPath, newWork.getLoadFileWork().getSourcePath());
@@ -162,7 +165,7 @@ public class TestGenMapRedUtilsCreateConditionalTask {
     reset(mockWork);
     when(mockWork.getLoadTableWork()).thenReturn(new LoadTableDesc(
         condOutputPath, tableDesc, null, null));
-    newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork);
+    newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState);
     assertNotNull(newWork);
     assertNotEquals(newWork, mockWork);
     assertEquals(condInputPath, newWork.getLoadTableWork().getSourcePath());
@@ -181,7 +184,8 @@ public class TestGenMapRedUtilsCreateConditionalTask {
     Task<MoveWork> moveTask = createMoveTask(finalDirName, tableLocation);
     List<Task<MoveWork>> moveTaskList = Collections.singletonList(moveTask);
 
-    GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask);
+    GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null,
+        moveTaskList, hiveConf, dummyMRTask, new LineageState());
     ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0);
     Task<? extends Serializable> moveOnlyTask = conditionalTask.getListTasks().get(0);
     Task<? extends Serializable> mergeOnlyTask = conditionalTask.getListTasks().get(1);
@@ -221,7 +225,8 @@ public class TestGenMapRedUtilsCreateConditionalTask {
     Task<MoveWork> moveTask = createMoveTask(finalDirName, tableLocation);
     List<Task<MoveWork>> moveTaskList = Collections.singletonList(moveTask);
 
-    GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask);
+    GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null,
+        moveTaskList, hiveConf, dummyMRTask, new LineageState());
     ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0);
     Task<? extends Serializable> moveOnlyTask = conditionalTask.getListTasks().get(0);
     Task<? extends Serializable> mergeOnlyTask = conditionalTask.getListTasks().get(1);
@@ -255,7 +260,8 @@ public class TestGenMapRedUtilsCreateConditionalTask {
     Task<MoveWork> moveTask = createMoveTask(finalDirName, tableLocation);
     List<Task<MoveWork>> moveTaskList = Collections.singletonList(moveTask);
 
-    GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask);
+    GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null,
+        moveTaskList, hiveConf, dummyMRTask, new LineageState());
     ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0);
     Task<? extends Serializable> moveOnlyTask = conditionalTask.getListTasks().get(0);
     Task<? extends Serializable> mergeOnlyTask = conditionalTask.getListTasks().get(1);