You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/04/08 00:18:55 UTC
svn commit: r1585607 - in /hive/branches/branch-0.13/ql/src:
java/org/apache/hadoop/hive/ql/exec/tez/
java/org/apache/hadoop/hive/ql/optimizer/
java/org/apache/hadoop/hive/ql/plan/
java/org/apache/hadoop/hive/ql/session/ test/org/apache/hadoop/hive/ql/...
Author: sershe
Date: Mon Apr 7 22:18:54 2014
New Revision: 1585607
URL: http://svn.apache.org/r1585607
Log:
HIVE-6739 : Hive HBase query fails on Tez due to missing jars and then due to NPE in getSplits (Sergey Shelukhin, reviewed by Vikram Dixit K)
Modified:
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1585607&r1=1585606&r2=1585607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Apr 7 22:18:54 2014
@@ -604,7 +604,7 @@ public class DagUtils {
combinedResources.putAll(sessionConfig.getSessionResources());
try {
- for(LocalResource lr : localizeTempFiles(conf)) {
+ for(LocalResource lr : localizeTempFilesFromConf(getHiveJarDirectory(conf), conf)) {
combinedResources.put(getBaseName(lr), lr);
}
} catch(LoginException le) {
@@ -665,7 +665,8 @@ public class DagUtils {
* @throws IOException when hdfs operation fails
* @throws LoginException when getDefaultDestDir fails with the same exception
*/
- public List<LocalResource> localizeTempFiles(Configuration conf) throws IOException, LoginException {
+ public List<LocalResource> localizeTempFilesFromConf(
+ String hdfsDirPathStr, Configuration conf) throws IOException, LoginException {
List<LocalResource> tmpResources = new ArrayList<LocalResource>();
String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
@@ -683,15 +684,32 @@ public class DagUtils {
String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
- // need to localize the additional jars and files
+ String allFiles = auxJars + "," + addedJars + "," + addedFiles + "," + addedArchives;
+ addTempFiles(conf, tmpResources, hdfsDirPathStr, allFiles.split(","));
+ return tmpResources;
+ }
- // we need the directory on hdfs to which we shall put all these files
- // Use HIVE_JAR_DIRECTORY only if it's set explicitly; otherwise use default directory
- String hdfsDirPathStr = getHiveJarDirectory(conf);
+ /**
+ * Localizes files, archives and jars from a provided array of names.
+ * @param hdfsDirPathStr Destination directoty in HDFS.
+ * @param conf Configuration.
+ * @param inputOutputJars The file names to localize.
+ * @return List<LocalResource> local resources to add to execution
+ * @throws IOException when hdfs operation fails.
+ * @throws LoginException when getDefaultDestDir fails with the same exception
+ */
+ public List<LocalResource> localizeTempFiles(String hdfsDirPathStr, Configuration conf,
+ String[] inputOutputJars) throws IOException, LoginException {
+ if (inputOutputJars == null) return null;
+ List<LocalResource> tmpResources = new ArrayList<LocalResource>();
+ addTempFiles(conf, tmpResources, hdfsDirPathStr, inputOutputJars);
+ return tmpResources;
+ }
- String allFiles = auxJars + "," + addedJars + "," + addedFiles + "," + addedArchives;
- String[] allFilesArr = allFiles.split(",");
- for (String file : allFilesArr) {
+ private void addTempFiles(Configuration conf,
+ List<LocalResource> tmpResources, String hdfsDirPathStr,
+ String[] files) throws IOException {
+ for (String file : files) {
if (!StringUtils.isNotBlank(file)) {
continue;
}
@@ -700,8 +718,6 @@ public class DagUtils {
new Path(hdfsFilePathStr), conf);
tmpResources.add(localResource);
}
-
- return tmpResources;
}
public String getHiveJarDirectory(Configuration conf) throws IOException, LoginException {
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1585607&r1=1585606&r2=1585607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Mon Apr 7 22:18:54 2014
@@ -64,7 +64,7 @@ public class TezSessionPoolManager {
HiveConf newConf = new HiveConf(initConf);
TezSessionState sessionState = defaultQueuePool.take();
newConf.set("tez.queue.name", sessionState.getQueueName());
- sessionState.open(TezSessionState.makeSessionId(), newConf);
+ sessionState.open(newConf);
defaultQueuePool.put(sessionState);
}
}
@@ -91,7 +91,7 @@ public class TezSessionPoolManager {
if (queue.length() == 0) {
continue;
}
- TezSessionState sessionState = createSession();
+ TezSessionState sessionState = createSession(TezSessionState.makeSessionId());
sessionState.setQueueName(queue);
sessionState.setDefault();
LOG.info("Created new tez session for queue: " + queue +
@@ -102,7 +102,7 @@ public class TezSessionPoolManager {
}
}
- private TezSessionState getSession(HiveConf conf)
+ private TezSessionState getSession(HiveConf conf, boolean doOpen)
throws Exception {
String queueName = conf.get("tez.queue.name");
@@ -120,7 +120,7 @@ public class TezSessionPoolManager {
LOG.info("QueueName: " + queueName + " nonDefaultUser: " + nonDefaultUser +
" defaultQueuePool: " + defaultQueuePool +
" blockingQueueLength: " + blockingQueueLength);
- return getNewSessionState(conf, queueName);
+ return getNewSessionState(conf, queueName, doOpen);
}
LOG.info("Choosing a session from the defaultQueuePool");
@@ -130,16 +130,21 @@ public class TezSessionPoolManager {
/**
* @param conf HiveConf that is used to initialize the session
* @param queueName could be null. Set in the tez session.
+ * @param doOpen
* @return
* @throws Exception
*/
private TezSessionState getNewSessionState(HiveConf conf,
- String queueName) throws Exception {
- TezSessionState retTezSessionState = createSession();
+ String queueName, boolean doOpen) throws Exception {
+ TezSessionState retTezSessionState = createSession(TezSessionState.makeSessionId());
retTezSessionState.setQueueName(queueName);
- retTezSessionState.open(TezSessionState.makeSessionId(), conf);
+ String what = "Created";
+ if (doOpen) {
+ retTezSessionState.open(conf);
+ what = "Started";
+ }
- LOG.info("Started a new session for queue: " + queueName +
+ LOG.info(what + " a new session for queue: " + queueName +
" session id: " + retTezSessionState.getSessionId());
return retTezSessionState;
}
@@ -179,11 +184,12 @@ public class TezSessionPoolManager {
}
}
- protected TezSessionState createSession() {
- return new TezSessionState();
+ protected TezSessionState createSession(String sessionId) {
+ return new TezSessionState(sessionId);
}
- public TezSessionState getSession(TezSessionState session, HiveConf conf) throws Exception {
+ public TezSessionState getSession(
+ TezSessionState session, HiveConf conf, boolean doOpen) throws Exception {
if (canWorkWithSameSession(session, conf)) {
return session;
}
@@ -192,7 +198,7 @@ public class TezSessionPoolManager {
session.close(false);
}
- return getSession(conf);
+ return getSession(conf, doOpen);
}
/*
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1585607&r1=1585606&r2=1585607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Mon Apr 7 22:18:54 2014
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -68,6 +69,8 @@ public class TezSessionState {
private String queueName;
private boolean defaultQueue = false;
+ private HashSet<String> additionalAmFiles = null;
+
private static List<TezSessionState> openSessions
= Collections.synchronizedList(new LinkedList<TezSessionState>());
@@ -83,8 +86,9 @@ public class TezSessionState {
* Constructor. We do not automatically connect, because we only want to
* load tez classes when the user has tez installed.
*/
- public TezSessionState() {
+ public TezSessionState(String sessionId) {
this(DagUtils.getInstance());
+ this.sessionId = sessionId;
}
/**
@@ -106,6 +110,11 @@ public class TezSessionState {
return UUID.randomUUID().toString();
}
+ public void open(HiveConf conf)
+ throws IOException, LoginException, URISyntaxException, TezException {
+ open(conf, null);
+ }
+
/**
* Creates a tez session. A session is tied to either a cli/hs2 session. You can
* submit multiple DAGs against a session (as long as they are executed serially).
@@ -114,10 +123,8 @@ public class TezSessionState {
* @throws LoginException
* @throws TezException
*/
- public void open(String sessionId, HiveConf conf)
- throws IOException, LoginException, URISyntaxException, TezException {
-
- this.sessionId = sessionId;
+ public void open(HiveConf conf, List<LocalResource> additionalLr)
+ throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException {
this.conf = conf;
// create the tez tmp dir
@@ -135,6 +142,14 @@ public class TezSessionState {
// configuration for the application master
Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr);
+ if (additionalLr != null) {
+ additionalAmFiles = new HashSet<String>();
+ for (LocalResource lr : additionalLr) {
+ String baseName = utils.getBaseName(lr);
+ additionalAmFiles.add(baseName);
+ commonLocalResources.put(baseName, lr);
+ }
+ }
// Create environment for AM.
Map<String, String> amEnv = new HashMap<String, String>();
@@ -174,6 +189,15 @@ public class TezSessionState {
openSessions.add(this);
}
+ public boolean hasResources(List<LocalResource> lrs) {
+ if (lrs == null || lrs.isEmpty()) return true;
+ if (additionalAmFiles == null || additionalAmFiles.isEmpty()) return false;
+ for (LocalResource lr : lrs) {
+ if (!additionalAmFiles.contains(utils.getBaseName(lr))) return false;
+ }
+ return true;
+ }
+
/**
* Close a tez session. Will cleanup any tez/am related resources. After closing a session
* no further DAGs can be executed against it.
@@ -202,6 +226,7 @@ public class TezSessionState {
tezScratchDir = null;
conf = null;
appJarLr = null;
+ additionalAmFiles = null;
}
public String getSessionId() {
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1585607&r1=1585606&r2=1585607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Apr 7 22:18:54 2014
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -103,45 +104,62 @@ public class TezTask extends Task<TezWor
TezSessionState session = null;
try {
- // Get or create Context object. If we create it we have to clean
- // it later as well.
+ // Get or create Context object. If we create it we have to clean it later as well.
ctx = driverContext.getCtx();
if (ctx == null) {
ctx = new Context(conf);
cleanContext = true;
}
- // Need to remove this static hack. But this is the way currently to
- // get a session.
+ // Need to remove this static hack. But this is the way currently to get a session.
SessionState ss = SessionState.get();
session = ss.getTezSession();
- session = TezSessionPoolManager.getInstance().getSession(session, conf);
+ session = TezSessionPoolManager.getInstance().getSession(session, conf, false);
ss.setTezSession(session);
- // if it's not running start it.
- if (!session.isOpen()) {
- // can happen if the user sets the tez flag after the session was
- // established
- LOG.info("Tez session hasn't been created yet. Opening session");
- session.open(session.getSessionId(), conf);
- }
+ // jobConf will hold all the configuration for hadoop, tez, and hive
+ JobConf jobConf = utils.createConfiguration(conf);
+
+ // Get all user jars from work (e.g. input format stuff).
+ String[] inputOutputJars = work.configureJobConfAndExtractJars(jobConf);
// we will localize all the files (jars, plans, hashtables) to the
- // scratch dir. let's create this first.
+ // scratch dir. let's create this and tmp first.
Path scratchDir = ctx.getMRScratchDir();
-
- // create the tez tmp dir
utils.createTezDir(scratchDir, conf);
- // jobConf will hold all the configuration for hadoop, tez, and hive
- JobConf jobConf = utils.createConfiguration(conf);
+ // we need to get the user specified local resources for this dag
+ String hiveJarDir = utils.getHiveJarDirectory(conf);
+ List<LocalResource> additionalLr = utils.localizeTempFilesFromConf(hiveJarDir, conf);
+ List<LocalResource> handlerLr = utils.localizeTempFiles(hiveJarDir, conf, inputOutputJars);
+ if (handlerLr != null) {
+ additionalLr.addAll(handlerLr);
+ }
+
+ // If we have any jars from input format, we need to restart the session because
+ // AM will need them; so, AM has to be restarted. What a mess...
+ if (!session.hasResources(handlerLr)) {
+ if (session.isOpen()) {
+ LOG.info("Tez session being reopened to pass custom jars to AM");
+ session.close(false);
+ session = TezSessionPoolManager.getInstance().getSession(null, conf, false);
+ ss.setTezSession(session);
+ }
+ session.open(conf, additionalLr);
+ }
+ if (!session.isOpen()) {
+ // can happen if the user sets the tez flag after the session was
+ // established
+ LOG.info("Tez session hasn't been created yet. Opening session");
+ session.open(conf);
+ }
// unless already installed on all the cluster nodes, we'll have to
// localize hive-exec.jar as well.
LocalResource appJarLr = session.getAppJarLr();
// next we translate the TezWork to a Tez DAG
- DAG dag = build(jobConf, work, scratchDir, appJarLr, ctx);
+ DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx);
// submit will send the job to the cluster and start executing
client = submit(jobConf, dag, scratchDir, appJarLr, session);
@@ -186,16 +204,13 @@ public class TezTask extends Task<TezWor
}
DAG build(JobConf conf, TezWork work, Path scratchDir,
- LocalResource appJarLr, Context ctx)
+ LocalResource appJarLr, List<LocalResource> additionalLr, Context ctx)
throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG);
Map<BaseWork, Vertex> workToVertex = new HashMap<BaseWork, Vertex>();
Map<BaseWork, JobConf> workToConf = new HashMap<BaseWork, JobConf>();
- // we need to get the user specified local resources for this dag
- List<LocalResource> additionalLr = utils.localizeTempFiles(conf);
-
// getAllWork returns a topologically sorted list, which we use to make
// sure that vertices are created before they are used in edges.
List<BaseWork> ws = work.getAllWork();
@@ -299,7 +314,7 @@ public class TezTask extends Task<TezWor
sessionState.close(true);
// (re)open the session
- sessionState.open(sessionState.getSessionId(), this.conf);
+ sessionState.open(this.conf);
console.printInfo("Session re-established.");
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1585607&r1=1585606&r2=1585607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Apr 7 22:18:54 2014
@@ -845,6 +845,13 @@ public final class GenMapRedUtils {
setKeyAndValueDesc(work.getReduceWork(), op);
}
}
+ } else if (task != null && (task.getWork() instanceof TezWork)) {
+ TezWork work = (TezWork)task.getWork();
+ for (BaseWork w : work.getAllWorkUnsorted()) {
+ if (w instanceof MapWork) {
+ ((MapWork)w).deriveExplainAttributes();
+ }
+ }
}
if (task.getChildTasks() == null) {
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1585607&r1=1585606&r2=1585607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java Mon Apr 7 22:18:54 2014
@@ -28,6 +28,7 @@ import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.mapred.JobConf;
/**
* BaseWork. Base class for any "work" that's being done on the cluster. Items like stats
@@ -106,4 +107,6 @@ public abstract class BaseWork extends A
return returnSet;
}
+
+ public abstract void configureJobConf(JobConf job);
}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1585607&r1=1585606&r2=1585607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Mon Apr 7 22:18:54 2014
@@ -172,6 +172,8 @@ public class MapWork extends BaseWork {
/**
* Derive additional attributes to be rendered by EXPLAIN.
+ * TODO: this method is relied upon by custom input formats to set jobconf properties.
+ * This is madness? - This is Hive Storage Handlers!
*/
public void deriveExplainAttributes() {
if (pathToPartitionInfo != null) {
@@ -495,6 +497,7 @@ public class MapWork extends BaseWork {
samplingType == 2 ? "SAMPLING_ON_START" : null;
}
+ @Override
public void configureJobConf(JobConf job) {
for (PartitionDesc partition : aliasToPartnInfo.values()) {
PlanUtils.configureJobConf(partition.getTableDesc(), job);
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java?rev=1585607&r1=1585606&r2=1585607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java Mon Apr 7 22:18:54 2014
@@ -33,6 +33,7 @@ import org.apache.commons.lang3.tuple.Pa
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.tez.dag.api.EdgeProperty;
/**
@@ -93,6 +94,10 @@ public class TezWork extends AbstractOpe
return result;
}
+ public Collection<BaseWork> getAllWorkUnsorted() {
+ return workGraph.keySet();
+ }
+
private void visit(BaseWork child, Set<BaseWork> seen, List<BaseWork> result) {
if (seen.contains(child)) {
@@ -271,6 +276,37 @@ public class TezWork extends AbstractOpe
}
return result;
}
+
+ private static final String MR_JAR_PROPERTY = "tmpjars";
+ /**
+ * Calls configureJobConf on instances of work that are part of this TezWork.
+ * Uses the passed job configuration to extract "tmpjars" added by these, so that Tez
+ * could add them to the job proper Tez way. This is a very hacky way but currently
+ * there's no good way to get these JARs - both storage handler interface, and HBase
+ * code, would have to change to get the list directly (right now it adds to tmpjars).
+ * This will happen in 0.14 hopefully.
+ * @param jobConf Job configuration.
+ * @return List of files added to tmpjars by storage handlers.
+ */
+ public String[] configureJobConfAndExtractJars(JobConf jobConf) {
+ String[] oldTmpJars = jobConf.getStrings(MR_JAR_PROPERTY);
+ jobConf.setStrings(MR_JAR_PROPERTY, new String[0]);
+ for (BaseWork work : workGraph.keySet()) {
+ work.configureJobConf(jobConf);
+ }
+ String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY);
+ if (oldTmpJars != null && (oldTmpJars.length != 0)) {
+ if (newTmpJars != null && (newTmpJars.length != 0)) {
+ String[] combinedTmpJars = new String[newTmpJars.length + oldTmpJars.length];
+ System.arraycopy(oldTmpJars, 0, combinedTmpJars, 0, oldTmpJars.length);
+ System.arraycopy(newTmpJars, 0, combinedTmpJars, oldTmpJars.length, newTmpJars.length);
+ jobConf.setStrings(MR_JAR_PROPERTY, combinedTmpJars);
+ } else {
+ jobConf.setStrings(MR_JAR_PROPERTY, oldTmpJars);
+ }
+ }
+ return newTmpJars;
+ }
/**
* connect adds an edge between a and b. Both nodes have
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java?rev=1585607&r1=1585606&r2=1585607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java Mon Apr 7 22:18:54 2014
@@ -28,6 +28,7 @@ import java.util.HashSet;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.mapred.JobConf;
/**
* Simple wrapper for union all cases. All contributing work for a union all
@@ -68,4 +69,7 @@ public class UnionWork extends BaseWork
public Set<UnionOperator> getUnionOperators() {
return unionOperators;
}
+
+ public void configureJobConf(JobConf job) {
+ }
}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1585607&r1=1585606&r2=1585607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Mon Apr 7 22:18:54 2014
@@ -346,9 +346,9 @@ public class SessionState {
.equals("tez") && (startSs.isHiveServerQuery == false)) {
try {
if (startSs.tezSessionState == null) {
- startSs.tezSessionState = new TezSessionState();
+ startSs.tezSessionState = new TezSessionState(startSs.getSessionId());
}
- startSs.tezSessionState.open(startSs.getSessionId(), startSs.conf);
+ startSs.tezSessionState.open(startSs.conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -811,10 +811,10 @@ public class SessionState {
}
public Set<String> list_resource(ResourceType t, List<String> filter) {
- if (resource_map.get(t) == null) {
+ Set<String> orig = resource_map.get(t);
+ if (orig == null) {
return null;
}
- Set<String> orig = resource_map.get(t);
if (filter == null) {
return orig;
} else {
Modified: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java?rev=1585607&r1=1585606&r2=1585607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java (original)
+++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java Mon Apr 7 22:18:54 2014
@@ -40,9 +40,9 @@ public class TestTezSessionPool {
}
@Override
- public TezSessionState createSession() {
- return new TestTezSessionState();
- }
+ public TezSessionState createSession(String sessionId) {
+ return new TestTezSessionState(sessionId);
+ }
}
@Before
@@ -54,8 +54,8 @@ public class TestTezSessionPool {
public void testGetNonDefaultSession() {
poolManager = new TestTezSessionPoolManager();
try {
- TezSessionState sessionState = poolManager.getSession(null, conf);
- TezSessionState sessionState1 = poolManager.getSession(sessionState, conf);
+ TezSessionState sessionState = poolManager.getSession(null, conf, true);
+ TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true);
if (sessionState1 != sessionState) {
fail();
}
@@ -75,25 +75,25 @@ public class TestTezSessionPool {
poolManager = new TestTezSessionPoolManager();
poolManager.setupPool(conf);
poolManager.startPool();
- TezSessionState sessionState = poolManager.getSession(null, conf);
+ TezSessionState sessionState = poolManager.getSession(null, conf, true);
if (sessionState.getQueueName().compareTo("a") != 0) {
fail();
}
poolManager.returnSession(sessionState);
- sessionState = poolManager.getSession(null, conf);
+ sessionState = poolManager.getSession(null, conf, true);
if (sessionState.getQueueName().compareTo("b") != 0) {
fail();
}
poolManager.returnSession(sessionState);
- sessionState = poolManager.getSession(null, conf);
+ sessionState = poolManager.getSession(null, conf, true);
if (sessionState.getQueueName().compareTo("c") != 0) {
fail();
}
poolManager.returnSession(sessionState);
- sessionState = poolManager.getSession(null, conf);
+ sessionState = poolManager.getSession(null, conf, true);
if (sessionState.getQueueName().compareTo("a") != 0) {
fail();
}
@@ -118,7 +118,7 @@ public class TestTezSessionPool {
tmpConf.set("tez.queue.name", "");
}
- TezSessionState session = poolManager.getSession(null, tmpConf);
+ TezSessionState session = poolManager.getSession(null, tmpConf, true);
Thread.sleep((random.nextInt(9) % 10) * 1000);
poolManager.returnSession(session);
} catch (Exception e) {
Modified: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java?rev=1585607&r1=1585606&r2=1585607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java (original)
+++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java Mon Apr 7 22:18:54 2014
@@ -38,6 +38,11 @@ public class TestTezSessionState extends
private String sessionId;
private HiveConf hiveConf;
+ public TestTezSessionState(String sessionId) {
+ super(sessionId);
+ this.sessionId = sessionId;
+ }
+
@Override
public boolean isOpen() {
return open;
@@ -48,11 +53,9 @@ public class TestTezSessionState extends
}
@Override
- public void open(String sessionId, HiveConf conf) throws IOException,
- LoginException, URISyntaxException, TezException {
- this.sessionId = sessionId;
- this.hiveConf = conf;
- }
+ public void open(HiveConf conf) {
+ this.hiveConf = conf;
+ }
@Override
public void close(boolean keepTmpDir) throws TezException, IOException {
Modified: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1585607&r1=1585606&r2=1585607&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original)
+++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Mon Apr 7 22:18:54 2014
@@ -33,6 +33,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import javax.security.auth.login.LoginException;
@@ -176,7 +177,7 @@ public class TestTezTask {
@Test
public void testBuildDag() throws IllegalArgumentException, IOException, Exception {
- DAG dag = task.build(conf, work, path, appLr, new Context(conf));
+ DAG dag = task.build(conf, work, path, appLr, null, new Context(conf));
for (BaseWork w: work.getAllWork()) {
Vertex v = dag.getVertex(w.getName());
assertNotNull(v);
@@ -196,7 +197,7 @@ public class TestTezTask {
@Test
public void testEmptyWork() throws IllegalArgumentException, IOException, Exception {
- DAG dag = task.build(conf, new TezWork(""), path, appLr, new Context(conf));
+ DAG dag = task.build(conf, new TezWork(""), path, appLr, null, new Context(conf));
assertEquals(dag.getVertices().size(), 0);
}
@@ -206,7 +207,7 @@ public class TestTezTask {
DAG dag = new DAG("test");
task.submit(conf, dag, path, appLr, sessionState);
// validate close/reopen
- verify(sessionState, times(1)).open(any(String.class), any(HiveConf.class));
+ verify(sessionState, times(1)).open(any(HiveConf.class));
verify(sessionState, times(1)).close(eq(true));
verify(session, times(2)).submitDAG(any(DAG.class));
}