You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2017/12/18 23:25:55 UTC
[12/50] [abbrv] hive git commit: HIVE-18054: Make Lineage work with
concurrent queries on a Session (Andrew Sherman, reviewed by Sahil Takiar)
HIVE-18054: Make Lineage work with concurrent queries on a Session (Andrew Sherman, reviewed by Sahil Takiar)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/646ccce8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/646ccce8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/646ccce8
Branch: refs/heads/standalone-metastore
Commit: 646ccce8ea3e8c944be164f86dbd5d3428bdbc44
Parents: f52e8b4
Author: Andrew Sherman <as...@cloudera.com>
Authored: Sat Dec 16 15:14:54 2017 -0600
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Sat Dec 16 15:24:11 2017 -0600
----------------------------------------------------------------------
.../java/org/apache/hive/jdbc/ReadableHook.java | 52 +++++++++
.../apache/hive/jdbc/TestJdbcWithMiniHS2.java | 114 +++++++++++++++++++
.../java/org/apache/hadoop/hive/ql/Driver.java | 46 ++++++--
.../org/apache/hadoop/hive/ql/QueryState.java | 35 +++++-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 6 +-
.../apache/hadoop/hive/ql/exec/MoveTask.java | 9 +-
.../org/apache/hadoop/hive/ql/exec/Task.java | 4 +
.../bootstrap/load/table/LoadPartitions.java | 3 +-
.../repl/bootstrap/load/table/LoadTable.java | 3 +-
.../hadoop/hive/ql/hooks/HookContext.java | 9 +-
.../hadoop/hive/ql/hooks/LineageLogger.java | 56 +++++++--
.../hive/ql/index/AggregateIndexHandler.java | 7 +-
.../hadoop/hive/ql/index/HiveIndexHandler.java | 6 +-
.../hive/ql/index/TableBasedIndexHandler.java | 18 ++-
.../ql/index/bitmap/BitmapIndexHandler.java | 8 +-
.../ql/index/compact/CompactIndexHandler.java | 8 +-
.../hive/ql/optimizer/GenMRFileSink1.java | 2 +-
.../hive/ql/optimizer/GenMapRedUtils.java | 27 +++--
.../hadoop/hive/ql/optimizer/IndexUtils.java | 6 +-
.../hive/ql/optimizer/lineage/Generator.java | 8 +-
.../hive/ql/parse/DDLSemanticAnalyzer.java | 15 ++-
.../hive/ql/parse/ExplainSemanticAnalyzer.java | 2 +-
.../hadoop/hive/ql/parse/GenTezUtils.java | 3 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 5 +-
.../hadoop/hive/ql/parse/IndexUpdater.java | 9 +-
.../hive/ql/parse/LoadSemanticAnalyzer.java | 2 +-
.../ql/parse/ReplicationSemanticAnalyzer.java | 4 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 8 +-
.../hadoop/hive/ql/parse/TaskCompiler.java | 11 +-
.../hive/ql/parse/spark/GenSparkUtils.java | 2 +-
.../apache/hadoop/hive/ql/plan/MoveWork.java | 25 +---
.../hadoop/hive/ql/session/LineageState.java | 2 +-
.../hadoop/hive/ql/session/SessionState.java | 15 ---
...TestGenMapRedUtilsCreateConditionalTask.java | 18 ++-
34 files changed, 403 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java
new file mode 100644
index 0000000..2dd283f
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/ReadableHook.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.jdbc;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+
+/**
+ * An ExecuteWithHookContext that stores HookContexts in memory and makes them available for reading
+ */
+public class ReadableHook implements ExecuteWithHookContext {
+
+ private static List<HookContext> hookList = Collections.synchronizedList(new ArrayList<>());
+
+ @Override
+ public void run(HookContext hookContext) throws Exception {
+ hookList.add(hookContext);
+ }
+
+ /**
+ * @return the stored HookContexts.
+ */
+ public static List<HookContext> getHookList() {
+ return hookList;
+ }
+
+ /**
+ * Clear the stored HookContexts.
+ */
+ public static void clear() {
+ hookList.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index 70bd29c..ffeee69 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -40,6 +40,7 @@ import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -47,6 +48,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
@@ -64,8 +66,12 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.hooks.LineageLogger;
+import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx;
import org.apache.hive.common.util.ReflectionUtil;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
import org.apache.hive.service.cli.HiveSQLException;
@@ -205,6 +211,9 @@ public class TestJdbcWithMiniHS2 {
conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
conf.setBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED, false);
conf.setBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER, false);
+ // store post-exec hooks calls so we can look at them later
+ conf.setVar(ConfVars.POSTEXECHOOKS, ReadableHook.class.getName() + "," +
+ LineageLogger.class.getName());
MiniHS2.Builder builder = new MiniHS2.Builder().withConf(conf).cleanupLocalDirOnStartup(false);
if (httpMode) {
builder = builder.withHTTPTransport();
@@ -1503,4 +1512,109 @@ public class TestJdbcWithMiniHS2 {
stmt.close();
fsConn.close();
}
+
+ /**
+ * A test that checks that Lineage is correct when a multiple concurrent
+ * requests are make on a connection
+ */
+ @Test
+ public void testConcurrentLineage() throws Exception {
+ // setup to run concurrent operations
+ Statement stmt = conTestDb.createStatement();
+ setSerializeInTasksInConf(stmt);
+ stmt.execute("drop table if exists testConcurrentLineage1");
+ stmt.execute("drop table if exists testConcurrentLineage2");
+ stmt.execute("create table testConcurrentLineage1 (col1 int)");
+ stmt.execute("create table testConcurrentLineage2 (col2 int)");
+
+ // clear vertices list
+ ReadableHook.clear();
+
+ // run 5 sql inserts concurrently
+ int numThreads = 5; // set to 1 for single threading
+ int concurrentCalls = 5;
+ ExecutorService pool = Executors.newFixedThreadPool(numThreads);
+ try {
+ List<InsertCallable> tasks = new ArrayList<>();
+ for (int i = 0; i < concurrentCalls; i++) {
+ InsertCallable runner = new InsertCallable(conTestDb);
+ tasks.add(runner);
+ }
+ List<Future<Void>> futures = pool.invokeAll(tasks);
+ for (Future<Void> future : futures) {
+ future.get(20, TimeUnit.SECONDS);
+ }
+ // check to see that the vertices are correct
+ checkVertices();
+ } finally {
+ // clean up
+ stmt.execute("drop table testConcurrentLineage1");
+ stmt.execute("drop table testConcurrentLineage2");
+ stmt.close();
+ pool.shutdownNow();
+ }
+ }
+
+ /**
+ * A Callable that does 2 inserts
+ */
+ private class InsertCallable implements Callable<Void> {
+ private Connection connection;
+
+ InsertCallable(Connection conn) {
+ this.connection = conn;
+ }
+
+ @Override public Void call() throws Exception {
+ doLineageInserts(connection);
+ return null;
+ }
+
+ private void doLineageInserts(Connection connection) throws SQLException {
+ Statement stmt = connection.createStatement();
+ stmt.execute("insert into testConcurrentLineage1 values (1)");
+ stmt.execute("insert into testConcurrentLineage2 values (2)");
+ }
+ }
+ /**
+ * check to see that the vertices derived from the HookContexts are correct
+ */
+ private void checkVertices() {
+ List<Set<LineageLogger.Vertex>> verticesLists = getVerticesFromHooks();
+
+ assertEquals("5 runs of 2 inserts makes 10", 10, verticesLists.size());
+ for (Set<LineageLogger.Vertex> vertices : verticesLists) {
+ assertFalse("Each insert affects a column so should be some vertices",
+ vertices.isEmpty());
+ assertEquals("Each insert affects one column so should be one vertex",
+ 1, vertices.size());
+ Iterator<LineageLogger.Vertex> iterator = vertices.iterator();
+ assertTrue(iterator.hasNext());
+ LineageLogger.Vertex vertex = iterator.next();
+ assertEquals(0, vertex.getId());
+ assertEquals(LineageLogger.Vertex.Type.COLUMN, vertex.getType());
+ String label = vertex.getLabel();
+ System.out.println("vertex.getLabel() = " + label);
+ assertTrue("did not see one of the 2 expected column names",
+ label.equals("testjdbcminihs2.testconcurrentlineage1.col1") ||
+ label.equals("testjdbcminihs2.testconcurrentlineage2.col2"));
+ }
+ }
+
+ /**
+ * Use the logic in LineageLogger to get vertices from Hook Contexts
+ */
+ private List<Set<LineageLogger.Vertex>> getVerticesFromHooks() {
+ List<Set<LineageLogger.Vertex>> verticesLists = new ArrayList<>();
+ List<HookContext> hookList = ReadableHook.getHookList();
+ for (HookContext hookContext : hookList) {
+ QueryPlan plan = hookContext.getQueryPlan();
+ LineageCtx.Index index = hookContext.getIndex();
+ assertNotNull(index);
+ List<LineageLogger.Edge> edges = LineageLogger.getEdges(plan, index);
+ Set<LineageLogger.Vertex> vertices = LineageLogger.getVertices(edges);
+ verticesLists.add(vertices);
+ }
+ return verticesLists;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index d3df015..b168906 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -112,6 +112,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivObjectActionType;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
+import org.apache.hadoop.hive.ql.session.LineageState;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.ql.wm.WmContext;
@@ -374,12 +375,20 @@ public class Driver implements CommandProcessor {
this(getNewQueryState(conf), null);
}
+ // Pass lineageState when a driver instantiates another Driver to run
+ // or compile another query
+ public Driver(HiveConf conf, LineageState lineageState) {
+ this(getNewQueryState(conf, lineageState), null);
+ }
+
public Driver(HiveConf conf, HiveTxnManager txnMgr) {
this(getNewQueryState(conf), null, null, txnMgr);
}
- public Driver(HiveConf conf, Context ctx) {
- this(getNewQueryState(conf), null, null);
+ // Pass lineageState when a driver instantiates another Driver to run
+ // or compile another query
+ public Driver(HiveConf conf, Context ctx, LineageState lineageState) {
+ this(getNewQueryState(conf, lineageState), null, null);
this.ctx = ctx;
}
@@ -387,6 +396,12 @@ public class Driver implements CommandProcessor {
this(getNewQueryState(conf), userName, null);
}
+ // Pass lineageState when a driver instantiates another Driver to run
+ // or compile another query
+ public Driver(HiveConf conf, String userName, LineageState lineageState) {
+ this(getNewQueryState(conf, lineageState), userName, null);
+ }
+
public Driver(QueryState queryState, String userName) {
this(queryState, userName, new HooksLoader(queryState.getConf()), null, null);
}
@@ -425,6 +440,20 @@ public class Driver implements CommandProcessor {
}
/**
+ * Generating the new QueryState object. Making sure, that the new queryId is generated.
+ * @param conf The HiveConf which should be used
+ * @param lineageState a LineageState to be set in the new QueryState object
+ * @return The new QueryState object
+ */
+ private static QueryState getNewQueryState(HiveConf conf, LineageState lineageState) {
+ return new QueryState.Builder()
+ .withGenerateNewQueryId(true)
+ .withHiveConf(conf)
+ .withLineageState(lineageState)
+ .build();
+ }
+
+ /**
* Compile a new query. Any currently-planned query associated with this Driver is discarded.
* Do not reset id for inner queries(index, etc). Task ids are used for task identity check.
*
@@ -1336,9 +1365,6 @@ public class Driver implements CommandProcessor {
private void releaseResources() {
releasePlan();
releaseDriverContext();
- if (SessionState.get() != null) {
- SessionState.get().getLineageState().clear();
- }
}
@Override
@@ -2404,9 +2430,6 @@ public class Driver implements CommandProcessor {
releaseFetchTask();
releaseResStream();
releaseContext();
- if (SessionState.get() != null) {
- SessionState.get().getLineageState().clear();
- }
if(destroyed) {
if (!hiveLocks.isEmpty()) {
try {
@@ -2440,9 +2463,6 @@ public class Driver implements CommandProcessor {
lDrvState.stateLock.unlock();
LockedDriverState.removeLockedDriverState();
}
- if (SessionState.get() != null) {
- SessionState.get().getLineageState().clear();
- }
return 0;
}
@@ -2504,4 +2524,8 @@ public class Driver implements CommandProcessor {
releaseResources();
this.queryState = getNewQueryState(queryState.getConf());
}
+
+ public QueryState getQueryState() {
+ return queryState;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
index f3a46db..4f0c165 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.session.LineageState;
/**
* The class to store query level info such as queryId. Multiple queries can run
@@ -40,12 +41,17 @@ public class QueryState {
private HiveOperation commandType;
/**
+ * Per-query Lineage state to track what happens in the query
+ */
+ private LineageState lineageState = new LineageState();
+
+ /**
* transaction manager used in the query.
*/
private HiveTxnManager txnManager;
/**
- * Private constructor, use QueryState.Builder instead
+ * Private constructor, use QueryState.Builder instead.
* @param conf The query specific configuration object
*/
private QueryState(HiveConf conf) {
@@ -79,6 +85,14 @@ public class QueryState {
return queryConf;
}
+ public LineageState getLineageState() {
+ return lineageState;
+ }
+
+ public void setLineageState(LineageState lineageState) {
+ this.lineageState = lineageState;
+ }
+
public HiveTxnManager getTxnManager() {
return txnManager;
}
@@ -95,9 +109,10 @@ public class QueryState {
private boolean runAsync = false;
private boolean generateNewQueryId = false;
private HiveConf hiveConf = null;
+ private LineageState lineageState = null;
/**
- * Default constructor - use this builder to create a QueryState object
+ * Default constructor - use this builder to create a QueryState object.
*/
public Builder() {
}
@@ -149,6 +164,16 @@ public class QueryState {
}
/**
+ * add a LineageState that will be set in the built QueryState
+ * @param lineageState the source lineageState
+ * @return the builder
+ */
+ public Builder withLineageState(LineageState lineageState) {
+ this.lineageState = lineageState;
+ return this;
+ }
+
+ /**
* Creates the QueryState object. The default values are:
* - runAsync false
* - confOverlay null
@@ -184,7 +209,11 @@ public class QueryState {
queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, QueryPlan.makeQueryId());
}
- return new QueryState(queryConf);
+ QueryState queryState = new QueryState(queryConf);
+ if (lineageState != null) {
+ queryState.setLineageState(lineageState);
+ }
+ return queryState;
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 55ef8de..05041cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4478,7 +4478,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
}
}
// Don't set inputs and outputs - the locks have already been taken so it's pointless.
- MoveWork mw = new MoveWork(null, null, null, null, false, SessionState.get().getLineageState());
+ MoveWork mw = new MoveWork(null, null, null, null, false);
mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null));
ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), mmWriteId, stmtId);
Task<?> mv = TaskFactory.get(mw, conf), ic = TaskFactory.get(icw, conf);
@@ -4909,7 +4909,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
Table createdTable = db.getTable(tbl.getDbName(), tbl.getTableName());
if (crtTbl.isCTAS()) {
DataContainer dc = new DataContainer(createdTable.getTTable());
- SessionState.get().getLineageState().setLineage(
+ queryState.getLineageState().setLineage(
createdTable.getPath(), dc, createdTable.getCols()
);
}
@@ -5137,7 +5137,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
//set lineage info
DataContainer dc = new DataContainer(tbl.getTTable());
- SessionState.get().getLineageState().setLineage(new Path(crtView.getViewName()), dc, tbl.getCols());
+ queryState.getLineageState().setLineage(new Path(crtView.getViewName()), dc, tbl.getCols());
}
return 0;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index f5a5e71..8387208 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -398,7 +398,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
dc = handleStaticParts(db, table, tbd, ti);
}
}
- if (work.getLineagState() != null && dc != null) {
+ if (dc != null) {
// If we are doing an update or a delete the number of columns in the table will not
// match the number of columns in the file sink. For update there will be one too many
// (because of the ROW__ID), and in the case of the delete there will be just the
@@ -416,7 +416,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
tableCols = table.getCols();
break;
}
- work.getLineagState().setLineage(tbd.getSourcePath(), dc, tableCols);
+ queryState.getLineageState().setLineage(tbd.getSourcePath(), dc, tableCols);
}
releaseLocks(tbd);
}
@@ -552,10 +552,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
dc = new DataContainer(table.getTTable(), partn.getTPartition());
// Don't set lineage on delete as we don't have all the columns
- if (work.getLineagState() != null &&
- work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
+ if (work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) {
- work.getLineagState().setLineage(tbd.getSourcePath(), dc,
+ queryState.getLineageState().setLineage(tbd.getSourcePath(), dc,
table.getCols());
}
LOG.info("Loading partition " + entry.getKey());
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 1f0487f..d75fcf7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -649,4 +649,8 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
return true;
}
+ public QueryState getQueryState() {
+ return queryState;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 262225f..1a542e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -245,8 +245,7 @@ public class LoadPartitions {
SessionState.get().getTxnMgr().getCurrentTxnId()
);
loadTableWork.setInheritTableSpecs(false);
- MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false,
- context.sessionStateLineageState);
+ MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false);
return TaskFactory.get(work, context.hiveConf);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 545b7a8..f5125a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -233,8 +233,7 @@ public class LoadTable {
SessionState.get().getTxnMgr().getCurrentTxnId()
);
MoveWork moveWork =
- new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false,
- context.sessionStateLineageState);
+ new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false);
Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf);
copyTask.addDependentTask(loadTableTask);
return copyTask;
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
index 7b61730..93f1da7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -88,12 +87,8 @@ public class HookContext {
inputs = queryPlan.getInputs();
outputs = queryPlan.getOutputs();
ugi = Utils.getUGI();
- linfo= null;
- depMap = null;
- if(SessionState.get() != null){
- linfo = SessionState.get().getLineageState().getLineageInfo();
- depMap = SessionState.get().getLineageState().getIndex();
- }
+ linfo = queryState.getLineageState().getLineageInfo();
+ depMap = queryState.getLineageState().getIndex();
this.userName = userName;
this.ipAddress = ipAddress;
this.hiveInstanceAddress = hiveInstanceAddress;
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
index 2f764f8..06eb9c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageLogger.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.hooks;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
@@ -74,7 +75,15 @@ public class LineageLogger implements ExecuteWithHookContext {
private static final String FORMAT_VERSION = "1.0";
- final static class Edge {
+ /**
+ * An edge in lineage.
+ */
+ @VisibleForTesting
+ public static final class Edge {
+
+ /**
+ * The types of Edge.
+ */
public static enum Type {
PROJECTION, PREDICATE
}
@@ -92,7 +101,15 @@ public class LineageLogger implements ExecuteWithHookContext {
}
}
- final static class Vertex {
+ /**
+ * A vertex in lineage.
+ */
+ @VisibleForTesting
+ public static final class Vertex {
+
+ /**
+ * A type in lineage.
+ */
public static enum Type {
COLUMN, TABLE
}
@@ -125,6 +142,21 @@ public class LineageLogger implements ExecuteWithHookContext {
Vertex vertex = (Vertex) obj;
return label.equals(vertex.label) && type == vertex.type;
}
+
+ @VisibleForTesting
+ public Type getType() {
+ return type;
+ }
+
+ @VisibleForTesting
+ public String getLabel() {
+ return label;
+ }
+
+ @VisibleForTesting
+ public int getId() {
+ return id;
+ }
}
@Override
@@ -203,7 +235,7 @@ public class LineageLogger implements ExecuteWithHookContext {
/**
* Logger an error to console if available.
*/
- private void log(String error) {
+ private static void log(String error) {
LogHelper console = SessionState.getConsole();
if (console != null) {
console.printError(error);
@@ -214,7 +246,8 @@ public class LineageLogger implements ExecuteWithHookContext {
* Based on the final select operator, find out all the target columns.
* For each target column, find out its sources based on the dependency index.
*/
- private List<Edge> getEdges(QueryPlan plan, Index index) {
+ @VisibleForTesting
+ public static List<Edge> getEdges(QueryPlan plan, Index index) {
LinkedHashMap<String, ObjectPair<SelectOperator,
org.apache.hadoop.hive.ql.metadata.Table>> finalSelOps = index.getFinalSelectOps();
Map<String, Vertex> vertexCache = new LinkedHashMap<String, Vertex>();
@@ -292,7 +325,7 @@ public class LineageLogger implements ExecuteWithHookContext {
return edges;
}
- private void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges,
+ private static void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges,
Set<BaseColumnInfo> srcCols, Vertex target, String expr, Edge.Type type) {
Set<Vertex> targets = new LinkedHashSet<Vertex>();
targets.add(target);
@@ -304,7 +337,7 @@ public class LineageLogger implements ExecuteWithHookContext {
* If found, add the more targets to this edge's target vertex list.
* Otherwise, create a new edge and add to edge list.
*/
- private void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges,
+ private static void addEdge(Map<String, Vertex> vertexCache, List<Edge> edges,
Set<BaseColumnInfo> srcCols, Set<Vertex> targets, String expr, Edge.Type type) {
Set<Vertex> sources = createSourceVertices(vertexCache, srcCols);
Edge edge = findSimilarEdgeBySources(edges, sources, expr, type);
@@ -319,7 +352,7 @@ public class LineageLogger implements ExecuteWithHookContext {
* Convert a list of columns to a set of vertices.
* Use cached vertices if possible.
*/
- private Set<Vertex> createSourceVertices(
+ private static Set<Vertex> createSourceVertices(
Map<String, Vertex> vertexCache, Collection<BaseColumnInfo> baseCols) {
Set<Vertex> sources = new LinkedHashSet<Vertex>();
if (baseCols != null && !baseCols.isEmpty()) {
@@ -346,7 +379,7 @@ public class LineageLogger implements ExecuteWithHookContext {
/**
* Find a vertex from a cache, or create one if not.
*/
- private Vertex getOrCreateVertex(
+ private static Vertex getOrCreateVertex(
Map<String, Vertex> vertices, String label, Vertex.Type type) {
Vertex vertex = vertices.get(label);
if (vertex == null) {
@@ -359,7 +392,7 @@ public class LineageLogger implements ExecuteWithHookContext {
/**
* Find an edge that has the same type, expression, and sources.
*/
- private Edge findSimilarEdgeBySources(
+ private static Edge findSimilarEdgeBySources(
List<Edge> edges, Set<Vertex> sources, String expr, Edge.Type type) {
for (Edge edge: edges) {
if (edge.type == type && StringUtils.equals(edge.expr, expr)
@@ -373,7 +406,7 @@ public class LineageLogger implements ExecuteWithHookContext {
/**
* Generate normalized name for a given target column.
*/
- private String getTargetFieldName(int fieldIndex,
+ private static String getTargetFieldName(int fieldIndex,
String destTableName, List<String> colNames, List<FieldSchema> fieldSchemas) {
String fieldName = fieldSchemas.get(fieldIndex).getName();
String[] parts = fieldName.split("\\.");
@@ -394,7 +427,8 @@ public class LineageLogger implements ExecuteWithHookContext {
* Get all the vertices of all edges. Targets at first,
* then sources. Assign id to each vertex.
*/
- private Set<Vertex> getVertices(List<Edge> edges) {
+ @VisibleForTesting
+ public static Set<Vertex> getVertices(List<Edge> edges) {
Set<Vertex> vertices = new LinkedHashSet<Vertex>();
for (Edge edge: edges) {
vertices.addAll(edge.targets);
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
index 68709b4..bf06723 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-
+import org.apache.hadoop.hive.ql.session.LineageState;
/**
* Index handler for indexes that have aggregate functions on indexed columns.
@@ -90,7 +90,8 @@ public class AggregateIndexHandler extends CompactIndexHandler {
Set<WriteEntity> outputs,
Index index, boolean partitioned,
PartitionDesc indexTblPartDesc, String indexTableName,
- PartitionDesc baseTablePartDesc, String baseTableName, String dbName) {
+ PartitionDesc baseTablePartDesc, String baseTableName, String dbName,
+ LineageState lineageState) {
List<FieldSchema> indexField = index.getSd().getCols();
String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField);
@@ -152,7 +153,7 @@ public class AggregateIndexHandler extends CompactIndexHandler {
builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false);
builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false);
Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
- command, (LinkedHashMap<String, String>) partSpec, indexTableName, dbName);
+ command, (LinkedHashMap<String, String>) partSpec, indexTableName, dbName, lineageState);
return rootTask;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java
index 1e577da..b6c0252 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexHandler.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
/**
* HiveIndexHandler defines a pluggable interface for adding new index handlers
@@ -99,6 +100,9 @@ public interface HiveIndexHandler extends Configurable {
* outputs for hooks, supplemental outputs going
* along with the return value
*
+ * @param lineageState
+ * tracks Lineage for the query
+ *
* @return list of tasks to be executed in parallel for building the index
*
* @throws HiveException if plan generation fails
@@ -108,7 +112,7 @@ public interface HiveIndexHandler extends Configurable {
org.apache.hadoop.hive.metastore.api.Index index,
List<Partition> indexTblPartitions, List<Partition> baseTblPartitions,
org.apache.hadoop.hive.ql.metadata.Table indexTbl,
- Set<ReadEntity> inputs, Set<WriteEntity> outputs)
+ Set<ReadEntity> inputs, Set<WriteEntity> outputs, LineageState lineageState)
throws HiveException;
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
index 29886ae..744ac29 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
/**
* Index handler for indexes that use tables to store indexes.
@@ -51,7 +52,8 @@ public abstract class TableBasedIndexHandler extends AbstractIndexHandler {
org.apache.hadoop.hive.metastore.api.Index index,
List<Partition> indexTblPartitions, List<Partition> baseTblPartitions,
org.apache.hadoop.hive.ql.metadata.Table indexTbl,
- Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws HiveException {
+ Set<ReadEntity> inputs, Set<WriteEntity> outputs,
+ LineageState lineageState) throws HiveException {
try {
TableDesc desc = Utilities.getTableDesc(indexTbl);
@@ -66,7 +68,7 @@ public abstract class TableBasedIndexHandler extends AbstractIndexHandler {
Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, false,
new PartitionDesc(desc, null), indexTbl.getTableName(),
new PartitionDesc(Utilities.getTableDesc(baseTbl), null),
- baseTbl.getTableName(), indexTbl.getDbName());
+ baseTbl.getTableName(), indexTbl.getDbName(), lineageState);
indexBuilderTasks.add(indexBuilder);
} else {
@@ -89,7 +91,8 @@ public abstract class TableBasedIndexHandler extends AbstractIndexHandler {
// for each partition, spawn a map reduce task.
Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, true,
new PartitionDesc(indexPart), indexTbl.getTableName(),
- new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName());
+ new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName(),
+ lineageState);
indexBuilderTasks.add(indexBuilder);
}
}
@@ -102,15 +105,18 @@ public abstract class TableBasedIndexHandler extends AbstractIndexHandler {
protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs,
Index index, boolean partitioned,
PartitionDesc indexTblPartDesc, String indexTableName,
- PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException {
+ PartitionDesc baseTablePartDesc, String baseTableName, String dbName,
+ LineageState lineageState) throws HiveException {
return getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(),
- partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName);
+ partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName,
+ lineageState);
}
protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs,
List<FieldSchema> indexField, boolean partitioned,
PartitionDesc indexTblPartDesc, String indexTableName,
- PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException {
+ PartitionDesc baseTablePartDesc, String baseTableName, String dbName,
+ LineageState lineageState) throws HiveException {
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
index 7b067a0..9117159 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
@@ -115,7 +116,7 @@ public class BitmapIndexHandler extends TableBasedIndexHandler {
LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString());
HiveConf queryConf = new HiveConf(pctx.getConf(), BitmapIndexHandler.class);
HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false);
- Driver driver = new Driver(queryConf);
+ Driver driver = new Driver(queryConf, pctx.getQueryState().getLineageState());
driver.compile(qlCommand.toString(), false);
queryContext.setIndexIntermediateFile(tmpFile);
@@ -222,7 +223,8 @@ public class BitmapIndexHandler extends TableBasedIndexHandler {
protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs,
List<FieldSchema> indexField, boolean partitioned,
PartitionDesc indexTblPartDesc, String indexTableName,
- PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException {
+ PartitionDesc baseTablePartDesc, String baseTableName, String dbName,
+ LineageState lineageState) throws HiveException {
HiveConf builderConf = new HiveConf(getConf(), BitmapIndexHandler.class);
HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEROWOFFSET, true);
@@ -290,7 +292,7 @@ public class BitmapIndexHandler extends TableBasedIndexHandler {
}
Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
- command, partSpec, indexTableName, dbName);
+ command, partSpec, indexTableName, dbName, lineageState);
return rootTask;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
index 504b062..73278cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
@@ -94,7 +95,8 @@ public class CompactIndexHandler extends TableBasedIndexHandler {
protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs,
List<FieldSchema> indexField, boolean partitioned,
PartitionDesc indexTblPartDesc, String indexTableName,
- PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException {
+ PartitionDesc baseTablePartDesc, String baseTableName, String dbName,
+ LineageState lineageState) throws HiveException {
String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField);
@@ -150,7 +152,7 @@ public class CompactIndexHandler extends TableBasedIndexHandler {
builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false);
builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false);
Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
- command, partSpec, indexTableName, dbName);
+ command, partSpec, indexTableName, dbName, lineageState);
return rootTask;
}
@@ -189,7 +191,7 @@ public class CompactIndexHandler extends TableBasedIndexHandler {
LOG.info("Generating tasks for re-entrant QL query: " + qlCommand.toString());
HiveConf queryConf = new HiveConf(pctx.getConf(), CompactIndexHandler.class);
HiveConf.setBoolVar(queryConf, HiveConf.ConfVars.COMPRESSRESULT, false);
- Driver driver = new Driver(queryConf);
+ Driver driver = new Driver(queryConf, pctx.getQueryState().getLineageState());
driver.compile(qlCommand.toString(), false);
if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) {
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
index d7a83f7..bb42dde 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
@@ -112,7 +112,7 @@ public class GenMRFileSink1 implements NodeProcessor {
LOG.info("using CombineHiveInputformat for the merge job");
GenMapRedUtils.createMRWorkForMergingFiles(fsOp, finalName,
ctx.getDependencyTaskForMultiInsert(), ctx.getMvTask(),
- hconf, currTask);
+ hconf, currTask, parseCtx.getQueryState().getLineageState());
}
FileSinkDesc fileSinkDesc = fsOp.getConf();
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index bdaf105..a0b2678 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1229,6 +1229,7 @@ public final class GenMapRedUtils {
* @param mvTasks
* @param conf
* @param currTask
+ * @param lineageState
* @throws SemanticException
* create a Map-only merge job using CombineHiveInputFormat for all partitions with
@@ -1257,10 +1258,11 @@ public final class GenMapRedUtils {
* directories.
*
*/
- public static void createMRWorkForMergingFiles (FileSinkOperator fsInput,
- Path finalName, DependencyCollectionTask dependencyTask,
- List<Task<MoveWork>> mvTasks, HiveConf conf,
- Task<? extends Serializable> currTask) throws SemanticException {
+ public static void createMRWorkForMergingFiles(FileSinkOperator fsInput,
+ Path finalName, DependencyCollectionTask dependencyTask,
+ List<Task<MoveWork>> mvTasks, HiveConf conf,
+ Task<? extends Serializable> currTask, LineageState lineageState)
+ throws SemanticException {
//
// 1. create the operator tree
@@ -1370,8 +1372,7 @@ public final class GenMapRedUtils {
if (srcMmWriteId == null) {
// Only create the movework for non-MM table. No action needed for a MM table.
dummyMv = new MoveWork(null, null, null,
- new LoadFileDesc(inputDirName, finalName, true, null, null, false), false,
- SessionState.get().getLineageState());
+ new LoadFileDesc(inputDirName, finalName, true, null, null, false), false);
}
// Use the original fsOp path here in case of MM - while the new FSOP merges files inside the
// MM directory, the original MoveTask still commits based on the parent. Note that this path
@@ -1382,7 +1383,7 @@ public final class GenMapRedUtils {
Task<MoveWork> mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(
mvTasks, fsopPath, fsInputDesc.isMmTable());
ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work,
- fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask);
+ fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask, lineageState);
// keep the dynamic partition context in conditional task resolver context
ConditionalResolverMergeFilesCtx mrCtx =
@@ -1730,15 +1731,16 @@ public final class GenMapRedUtils {
*
* @param condInputPath A path that the ConditionalTask uses as input for its sub-tasks.
* @param linkedMoveWork A MoveWork that the ConditionalTask uses to link to its sub-tasks.
+ * @param lineageState A LineageState used to track what changes.
* @return A new MoveWork that has the Conditional input path as source and the linkedMoveWork as target.
*/
@VisibleForTesting
- protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMoveWork) {
+ protected static MoveWork mergeMovePaths(Path condInputPath, MoveWork linkedMoveWork,
+ LineageState lineageState) {
MoveWork newWork = new MoveWork(linkedMoveWork);
LoadFileDesc fileDesc = null;
LoadTableDesc tableDesc = null;
- LineageState lineageState = SessionState.get().getLineageState();
if (linkedMoveWork.getLoadFileWork() != null) {
fileDesc = new LoadFileDesc(linkedMoveWork.getLoadFileWork());
fileDesc.setSourcePath(condInputPath);
@@ -1776,13 +1778,15 @@ public final class GenMapRedUtils {
* a MoveTask that may be linked to the conditional sub-tasks
* @param dependencyTask
* a dependency task that may be linked to the conditional sub-tasks
+ * @param lineageState
+ * to track activity
* @return The conditional task
*/
@SuppressWarnings("unchecked")
private static ConditionalTask createCondTask(HiveConf conf,
Task<? extends Serializable> currTask, MoveWork mvWork, Serializable mergeWork,
Path condInputPath, Path condOutputPath, Task<MoveWork> moveTaskToLink,
- DependencyCollectionTask dependencyTask) {
+ DependencyCollectionTask dependencyTask, LineageState lineageState) {
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("Creating conditional merge task for " + condInputPath);
}
@@ -1795,7 +1799,8 @@ public final class GenMapRedUtils {
Serializable workForMoveOnlyTask = moveWork;
if (shouldMergeMovePaths) {
- workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork());
+ workForMoveOnlyTask = mergeMovePaths(condInputPath, moveTaskToLink.getWork(),
+ lineageState);
}
// There are 3 options for this ConditionalTask:
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java
index 338c185..f69c9a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.LineageState;
/**
* Utility class for index support.
@@ -221,10 +222,11 @@ public final class IndexUtils {
StringBuilder command,
LinkedHashMap<String, String> partSpec,
String indexTableName,
- String dbName){
+ String dbName,
+ LineageState lineageState){
// Don't try to index optimize the query to build the index
HiveConf.setBoolVar(builderConf, HiveConf.ConfVars.HIVEOPTINDEXFILTER, false);
- Driver driver = new Driver(builderConf, SessionState.get().getUserName());
+ Driver driver = new Driver(builderConf, SessionState.get().getUserName(), lineageState);
driver.compile(command.toString(), false);
Task<?> rootTask = driver.getPlan().getRootTasks().get(0);
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
index e6c0771..0d72a1e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/Generator.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hive.ql.optimizer.Transform;
import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,9 +84,10 @@ public class Generator extends Transform {
return pctx;
}
}
-
- Index index = SessionState.get() != null ?
- SessionState.get().getLineageState().getIndex() : new Index();
+ Index index = pctx.getQueryState().getLineageState().getIndex();
+ if (index == null) {
+ index = new Index();
+ }
long sTime = System.currentTimeMillis();
// Create the lineage context
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index a09b796..971a061 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -158,6 +158,7 @@ import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
import org.apache.hadoop.hive.ql.plan.CreateOrAlterWMPoolDesc;
import org.apache.hadoop.hive.ql.plan.CreateOrDropTriggerToPoolMappingDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.serdeConstants;
@@ -1485,8 +1486,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
partSpec == null ? new HashMap<>() : partSpec, null);
ltd.setLbCtx(lbCtx);
@SuppressWarnings("unchecked")
- Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(
- null, null, ltd, null, false, SessionState.get().getLineageState()), conf);
+ Task<MoveWork> moveTsk =
+ TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
truncateTask.addDependentTask(moveTsk);
// Recalculate the HDFS stats if auto gather stats is set
@@ -1703,8 +1704,10 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
indexTbl, db, indexTblPartitions);
}
+ LineageState lineageState = queryState.getLineageState();
List<Task<?>> ret = handler.generateIndexBuildTaskList(baseTbl,
- index, indexTblPartitions, baseTblPartitions, indexTbl, getInputs(), getOutputs());
+ index, indexTblPartitions, baseTblPartitions, indexTbl, getInputs(), getOutputs(),
+ lineageState);
return ret;
} catch (Exception e) {
throw new SemanticException(e);
@@ -2146,8 +2149,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
partSpec == null ? new HashMap<>() : partSpec, null);
ltd.setLbCtx(lbCtx);
- Task<MoveWork> moveTsk = TaskFactory.get(
- new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), conf);
+ Task<MoveWork> moveTsk =
+ TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
mergeTask.addDependentTask(moveTsk);
if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
@@ -3539,7 +3542,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
}
SessionState ss = SessionState.get();
String uName = (ss == null? null: ss.getUserName());
- Driver driver = new Driver(conf, uName);
+ Driver driver = new Driver(conf, uName, queryState.getLineageState());
int rc = driver.compile(cmd.toString(), false);
if (rc != 0) {
throw new SemanticException(ErrorMsg.NO_VALID_PARTN.getMsg());
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
index 065c7e5..0eacfc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
@@ -134,7 +134,7 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer {
runCtx = new Context(conf);
// runCtx and ctx share the configuration, but not isExplainPlan()
runCtx.setExplainConfig(config);
- Driver driver = new Driver(conf, runCtx);
+ Driver driver = new Driver(conf, runCtx, queryState.getLineageState());
CommandProcessorResponse ret = driver.run(query);
if(ret.getResponseCode() == 0) {
// Note that we need to call getResults for simple fetch optimization.
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index b6f1139..e6d4cbe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -385,7 +385,8 @@ public class GenTezUtils {
+ fileSink.getConf().getDirName() + " to " + finalName);
GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
context.dependencyTask, context.moveTask,
- hconf, context.currentTask);
+ hconf, context.currentTask,
+ parseContext.getQueryState().getLineageState());
}
FetchTask fetchTask = parseContext.getFetchTask();
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 83d53bc..c79df56 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -391,7 +391,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
Utilities.getTableDesc(table), new TreeMap<>(),
replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, txnId);
loadTableWork.setStmtId(stmtId);
- MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState());
+ MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork,
+ null, false);
Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
copyTask.addDependentTask(loadTableTask);
x.getTasks().add(copyTask);
@@ -495,7 +496,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
loadTableWork.setStmtId(stmtId);
loadTableWork.setInheritTableSpecs(false);
Task<?> loadPartTask = TaskFactory.get(new MoveWork(
- x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()), x.getConf());
+ x.getInputs(), x.getOutputs(), loadTableWork, null, false), x.getConf());
copyTask.addDependentTask(loadPartTask);
addPartTask.addDependentTask(loadPartTask);
x.getTasks().add(copyTask);
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
index f31775e..ccf1e66 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
import java.io.Serializable;
import java.util.LinkedList;
@@ -47,12 +48,14 @@ public class IndexUpdater {
private Hive hive;
private List<Task<? extends Serializable>> tasks;
private Set<ReadEntity> inputs;
+ private LineageState lineageState;
-
- public IndexUpdater(List<LoadTableDesc> loadTableWork, Set<ReadEntity> inputs, Configuration conf) {
+ public IndexUpdater(List<LoadTableDesc> loadTableWork, Set<ReadEntity> inputs, Configuration conf,
+ LineageState lineageState) {
this.loadTableWork = loadTableWork;
this.inputs = inputs;
this.conf = new HiveConf(conf, IndexUpdater.class);
+ this.lineageState = lineageState;
this.tasks = new LinkedList<Task<? extends Serializable>>();
}
@@ -133,7 +136,7 @@ public class IndexUpdater {
}
private void compileRebuild(String query) {
- Driver driver = new Driver(this.conf);
+ Driver driver = new Driver(this.conf, lineageState);
driver.compile(query, false);
tasks.addAll(driver.getPlan().getRootTasks());
inputs.addAll(driver.getPlan().getInputs());
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index cc956da..e600f7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -302,7 +302,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
Task<? extends Serializable> childTask = TaskFactory.get(
new MoveWork(getInputs(), getOutputs(), loadTableWork, null, true,
- isLocal, SessionState.get().getLineageState()), conf
+ isLocal), conf
);
if (rTask != null) {
rTask.addDependentTask(childTask);
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 498b674..80556ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -313,7 +313,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
ReplLoadWork replLoadWork =
new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern,
- SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId());
+ queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId());
rootTasks.add(TaskFactory.get(replLoadWork, conf, true));
return;
}
@@ -344,7 +344,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
- SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId());
+ queryState.getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId());
rootTasks.add(TaskFactory.get(replLoadWork, conf, true));
//
// for (FileStatus dir : dirsInLoadPath) {
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 28e3621..dcda8b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7336,8 +7336,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
private void handleLineage(LoadTableDesc ltd, Operator output)
throws SemanticException {
- if (ltd != null && SessionState.get() != null) {
- SessionState.get().getLineageState()
+ if (ltd != null) {
+ queryState.getLineageState()
.mapDirToOp(ltd.getSourcePath(), output);
} else if ( queryState.getCommandType().equals(HiveOperation.CREATETABLE_AS_SELECT.getOperationName())) {
@@ -7350,7 +7350,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
throw new SemanticException(e);
}
- SessionState.get().getLineageState()
+ queryState.getLineageState()
.mapDirToOp(tlocation, output);
}
}
@@ -11685,7 +11685,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
pCtx = t.transform(pCtx);
}
// we just use view name as location.
- SessionState.get().getLineageState()
+ queryState.getLineageState()
.mapDirToOp(new Path(createVwDesc.getViewName()), sinkOp);
}
return;
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 7b29370..24559b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.serde.serdeConstants;
@@ -105,7 +106,8 @@ public abstract class TaskCompiler {
}
@SuppressWarnings({"nls", "unchecked"})
- public void compile(final ParseContext pCtx, final List<Task<? extends Serializable>> rootTasks,
+ public void compile(final ParseContext pCtx,
+ final List<Task<? extends Serializable>> rootTasks,
final HashSet<ReadEntity> inputs, final HashSet<WriteEntity> outputs) throws SemanticException {
Context ctx = pCtx.getContext();
@@ -218,12 +220,13 @@ public abstract class TaskCompiler {
} else if (!isCStats) {
for (LoadTableDesc ltd : loadTableWork) {
Task<MoveWork> tsk = TaskFactory
- .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()),
+ .get(new MoveWork(null, null, ltd, null, false),
conf);
mvTask.add(tsk);
// Check to see if we are stale'ing any indexes and auto-update them if we want
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
- IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf);
+ IndexUpdater indexUpdater = new IndexUpdater(loadTableWork, inputs, conf,
+ queryState.getLineageState());
try {
List<Task<? extends Serializable>> indexUpdateTasks = indexUpdater
.generateUpdateTasks();
@@ -248,7 +251,7 @@ public abstract class TaskCompiler {
oneLoadFileForCtas = false;
}
mvTask.add(TaskFactory
- .get(new MoveWork(null, null, null, lfd, false, SessionState.get().getLineageState()),
+ .get(new MoveWork(null, null, null, lfd, false),
conf));
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 604c8ae..c6c7bf7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -385,7 +385,7 @@ public class GenSparkUtils {
LOG.info("using CombineHiveInputformat for the merge job");
GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
context.dependencyTask, context.moveTask,
- hconf, context.currentTask);
+ hconf, context.currentTask, parseContext.getQueryState().getLineageState());
}
FetchTask fetchTask = parseContext.getFetchTask();
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 28a3374..49fe540 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
-import org.apache.hadoop.hive.ql.session.LineageState;
/**
* MoveWork.
@@ -39,13 +38,6 @@ public class MoveWork implements Serializable {
private LoadTableDesc loadTableWork;
private LoadFileDesc loadFileWork;
private LoadMultiFilesDesc loadMultiFilesWork;
- /*
- these are sessionState objects that are copied over to work to allow for parallel execution.
- based on the current use case the methods are selectively synchronized, which might need to be
- taken care when using other methods.
- */
- private final LineageState sessionStateLineageState;
-
private boolean checkFileFormat;
private boolean srcLocal;
@@ -65,21 +57,18 @@ public class MoveWork implements Serializable {
private boolean isNoop;
public MoveWork() {
- sessionStateLineageState = null;
}
- private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
- LineageState lineageState) {
+ private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) {
this.inputs = inputs;
this.outputs = outputs;
- sessionStateLineageState = lineageState;
}
public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
- boolean checkFileFormat, boolean srcLocal, LineageState lineageState) {
- this(inputs, outputs, lineageState);
+ boolean checkFileFormat, boolean srcLocal) {
+ this(inputs, outputs);
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("Creating MoveWork " + System.identityHashCode(this)
+ " with " + loadTableWork + "; " + loadFileWork);
@@ -92,8 +81,8 @@ public class MoveWork implements Serializable {
public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
- boolean checkFileFormat, LineageState lineageState) {
- this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false, lineageState);
+ boolean checkFileFormat) {
+ this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false);
}
public MoveWork(final MoveWork o) {
@@ -104,7 +93,6 @@ public class MoveWork implements Serializable {
srcLocal = o.isSrcLocal();
inputs = o.getInputs();
outputs = o.getOutputs();
- sessionStateLineageState = o.sessionStateLineageState;
}
@Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
@@ -166,7 +154,4 @@ public class MoveWork implements Serializable {
this.srcLocal = srcLocal;
}
- public LineageState getLineagState() {
- return sessionStateLineageState;
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
index 056d614..82eeb35 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
@@ -60,7 +60,7 @@ public class LineageState implements Serializable {
/**
* Constructor.
*/
- LineageState() {
+ public LineageState() {
dirToFop = new HashMap<>();
linfo = new LineageInfo();
index = new Index();
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index bb6ddc6..d03f5e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -232,11 +232,6 @@ public class SessionState {
*/
private Map<URI, HadoopShims.HdfsEncryptionShim> hdfsEncryptionShims = Maps.newHashMap();
- /**
- * Lineage state.
- */
- LineageState ls;
-
private final String userName;
/**
@@ -294,15 +289,6 @@ public class SessionState {
private List<Closeable> cleanupItems = new LinkedList<Closeable>();
- /**
- * Get the lineage state stored in this session.
- *
- * @return LineageState
- */
- public LineageState getLineageState() {
- return ls;
- }
-
public HiveConf getConf() {
return sessionConf;
}
@@ -387,7 +373,6 @@ public class SessionState {
LOG.debug("SessionState user: " + userName);
}
isSilent = conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT);
- ls = new LineageState();
resourceMaps = new ResourceMaps();
// Must be deterministic order map for consistent q-test output across Java versions
overriddenConfigurations = new LinkedHashMap<String, String>();
http://git-wip-us.apache.org/repos/asf/hive/blob/646ccce8/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
index 3406892..3c007a7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.LineageState;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -135,9 +136,10 @@ public class TestGenMapRedUtilsCreateConditionalTask {
public void testMergePathWithInvalidMoveWorkThrowsException() {
final Path condInputPath = new Path("s3a://bucket/scratch/-ext-10000");
final MoveWork mockWork = mock(MoveWork.class);
+ final LineageState lineageState = new LineageState();
when(mockWork.getLoadMultiFilesWork()).thenReturn(new LoadMultiFilesDesc());
- GenMapRedUtils.mergeMovePaths(condInputPath, mockWork);
+ GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState);
}
@Test
@@ -146,12 +148,13 @@ public class TestGenMapRedUtilsCreateConditionalTask {
final Path condOutputPath = new Path("s3a://bucket/scratch/-ext-10002");
final Path targetMoveWorkPath = new Path("s3a://bucket/scratch/-ext-10003");
final MoveWork mockWork = mock(MoveWork.class);
+ final LineageState lineageState = new LineageState();
MoveWork newWork;
// test using loadFileWork
when(mockWork.getLoadFileWork()).thenReturn(new LoadFileDesc(
condOutputPath, targetMoveWorkPath, false, "", "", false));
- newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork);
+ newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState);
assertNotNull(newWork);
assertNotEquals(newWork, mockWork);
assertEquals(condInputPath, newWork.getLoadFileWork().getSourcePath());
@@ -162,7 +165,7 @@ public class TestGenMapRedUtilsCreateConditionalTask {
reset(mockWork);
when(mockWork.getLoadTableWork()).thenReturn(new LoadTableDesc(
condOutputPath, tableDesc, null, null));
- newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork);
+ newWork = GenMapRedUtils.mergeMovePaths(condInputPath, mockWork, lineageState);
assertNotNull(newWork);
assertNotEquals(newWork, mockWork);
assertEquals(condInputPath, newWork.getLoadTableWork().getSourcePath());
@@ -181,7 +184,8 @@ public class TestGenMapRedUtilsCreateConditionalTask {
Task<MoveWork> moveTask = createMoveTask(finalDirName, tableLocation);
List<Task<MoveWork>> moveTaskList = Collections.singletonList(moveTask);
- GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask);
+ GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null,
+ moveTaskList, hiveConf, dummyMRTask, new LineageState());
ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0);
Task<? extends Serializable> moveOnlyTask = conditionalTask.getListTasks().get(0);
Task<? extends Serializable> mergeOnlyTask = conditionalTask.getListTasks().get(1);
@@ -221,7 +225,8 @@ public class TestGenMapRedUtilsCreateConditionalTask {
Task<MoveWork> moveTask = createMoveTask(finalDirName, tableLocation);
List<Task<MoveWork>> moveTaskList = Collections.singletonList(moveTask);
- GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask);
+ GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null,
+ moveTaskList, hiveConf, dummyMRTask, new LineageState());
ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0);
Task<? extends Serializable> moveOnlyTask = conditionalTask.getListTasks().get(0);
Task<? extends Serializable> mergeOnlyTask = conditionalTask.getListTasks().get(1);
@@ -255,7 +260,8 @@ public class TestGenMapRedUtilsCreateConditionalTask {
Task<MoveWork> moveTask = createMoveTask(finalDirName, tableLocation);
List<Task<MoveWork>> moveTaskList = Collections.singletonList(moveTask);
- GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null, moveTaskList, hiveConf, dummyMRTask);
+ GenMapRedUtils.createMRWorkForMergingFiles(fileSinkOperator, finalDirName, null,
+ moveTaskList, hiveConf, dummyMRTask, new LineageState());
ConditionalTask conditionalTask = (ConditionalTask)dummyMRTask.getChildTasks().get(0);
Task<? extends Serializable> moveOnlyTask = conditionalTask.getListTasks().get(0);
Task<? extends Serializable> mergeOnlyTask = conditionalTask.getListTasks().get(1);