You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/04/09 04:18:58 UTC
svn commit: r1585879 - in
/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez:
DagUtils.java TezSessionState.java TezTask.java
Author: sershe
Date: Wed Apr 9 02:18:57 2014
New Revision: 1585879
URL: http://svn.apache.org/r1585879
Log:
HIVE-6825 : custom jars for Hive query should be uploaded to scratch dir per query; and/or versioned (Sergey Shelukhin, reviewed by Vikram Dixit K)
Modified:
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1585879&r1=1585878&r2=1585879&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Wed Apr 9 02:18:57 2014
@@ -603,16 +603,7 @@ public class DagUtils {
Map<String, LocalResource> combinedResources = new HashMap<String, LocalResource>();
combinedResources.putAll(sessionConfig.getSessionResources());
-
- try {
- for(LocalResource lr : localizeTempFilesFromConf(getHiveJarDirectory(conf), conf)) {
- combinedResources.put(getBaseName(lr), lr);
- }
- } catch(LoginException le) {
- throw new IOException(le);
- }
-
- if(localResources != null) {
+ if (localResources != null) {
combinedResources.putAll(localResources);
}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1585879&r1=1585878&r2=1585879&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Wed Apr 9 02:18:57 2014
@@ -21,6 +21,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -75,7 +76,8 @@ public class TezSessionState {
private boolean defaultQueue = false;
private String user;
- private HashSet<String> additionalAmFiles = null;
+ private HashSet<String> additionalFilesNotFromConf = null;
+ private List<LocalResource> localizedResources;
private static List<TezSessionState> openSessions
= Collections.synchronizedList(new LinkedList<TezSessionState>());
@@ -129,7 +131,7 @@ public class TezSessionState {
* @throws LoginException
* @throws TezException
*/
- public void open(HiveConf conf, List<LocalResource> additionalLr)
+ public void open(HiveConf conf, String[] additionalFiles)
throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException {
this.conf = conf;
@@ -141,6 +143,22 @@ public class TezSessionState {
// create the tez tmp dir
tezScratchDir = createTezDir(sessionId);
+ String dir = tezScratchDir.toString();
+ // Localize resources to session scratch dir
+ localizedResources = utils.localizeTempFilesFromConf(dir, conf);
+ List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf, additionalFiles);
+ if (handlerLr != null) {
+ if (localizedResources == null) {
+ localizedResources = handlerLr;
+ } else {
+ localizedResources.addAll(handlerLr);
+ }
+ additionalFilesNotFromConf = new HashSet<String>();
+ for (String originalFile : additionalFiles) {
+ additionalFilesNotFromConf.add(originalFile);
+ }
+ }
+
// generate basic tez config
TezConfiguration tezConfig = new TezConfiguration(conf);
@@ -153,12 +171,9 @@ public class TezSessionState {
// configuration for the application master
Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr);
- if (additionalLr != null) {
- additionalAmFiles = new HashSet<String>();
- for (LocalResource lr : additionalLr) {
- String baseName = utils.getBaseName(lr);
- additionalAmFiles.add(baseName);
- commonLocalResources.put(baseName, lr);
+ if (localizedResources != null) {
+ for (LocalResource lr : localizedResources) {
+ commonLocalResources.put(utils.getBaseName(lr), lr);
}
}
@@ -172,9 +187,10 @@ public class TezSessionState {
TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConfig);
// and finally we're ready to create and start the session
- session = new TezSession("HIVE-"+sessionId, sessionConfig);
+ session = new TezSession("HIVE-" + sessionId, sessionConfig);
- LOG.info("Opening new Tez Session (id: "+sessionId+", scratch dir: "+tezScratchDir+")");
+ LOG.info("Opening new Tez Session (id: " + sessionId
+ + ", scratch dir: " + tezScratchDir + ")");
session.start();
@@ -200,11 +216,11 @@ public class TezSessionState {
openSessions.add(this);
}
- public boolean hasResources(List<LocalResource> lrs) {
- if (lrs == null || lrs.isEmpty()) return true;
- if (additionalAmFiles == null || additionalAmFiles.isEmpty()) return false;
- for (LocalResource lr : lrs) {
- if (!additionalAmFiles.contains(utils.getBaseName(lr))) return false;
+ public boolean hasResources(String[] localAmResources) {
+ if (localAmResources == null || localAmResources.length == 0) return true;
+ if (additionalFilesNotFromConf == null || additionalFilesNotFromConf.isEmpty()) return false;
+ for (String s : localAmResources) {
+ if (!additionalFilesNotFromConf.contains(s)) return false;
}
return true;
}
@@ -236,7 +252,8 @@ public class TezSessionState {
tezScratchDir = null;
conf = null;
appJarLr = null;
- additionalAmFiles = null;
+ additionalFilesNotFromConf = null;
+ localizedResources = null;
}
public void cleanupScratchDir () throws IOException {
@@ -392,6 +409,10 @@ public class TezSessionState {
return conf;
}
+ public List<LocalResource> getLocalizedResources() {
+ return localizedResources;
+ }
+
public String getUser() {
return user;
}
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1585879&r1=1585878&r2=1585879&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Wed Apr 9 02:18:57 2014
@@ -130,31 +130,22 @@ public class TezTask extends Task<TezWor
// create the tez tmp dir
scratchDir = utils.createTezDir(scratchDir, conf);
- // we need to get the user specified local resources for this dag
- String hiveJarDir = utils.getHiveJarDirectory(conf);
- List<LocalResource> additionalLr = utils.localizeTempFilesFromConf(hiveJarDir, conf);
- List<LocalResource> handlerLr = utils.localizeTempFiles(hiveJarDir, conf, inputOutputJars);
- if (handlerLr != null) {
- additionalLr.addAll(handlerLr);
- }
-
// If we have any jars from input format, we need to restart the session because
// AM will need them; so, AM has to be restarted. What a mess...
- if (!session.hasResources(handlerLr)) {
- if (session.isOpen()) {
- LOG.info("Tez session being reopened to pass custom jars to AM");
- session.close(false);
- session = TezSessionPoolManager.getInstance().getSession(null, conf, false);
- ss.setTezSession(session);
- }
- session.open(conf, additionalLr);
+ if (!session.hasResources(inputOutputJars) && session.isOpen()) {
+ LOG.info("Tez session being reopened to pass custom jars to AM");
+ session.close(false);
+ session = TezSessionPoolManager.getInstance().getSession(null, conf, false);
+ ss.setTezSession(session);
}
+
if (!session.isOpen()) {
// can happen if the user sets the tez flag after the session was
// established
LOG.info("Tez session hasn't been created yet. Opening session");
- session.open(conf);
+ session.open(conf, inputOutputJars);
}
+ List<LocalResource> additionalLr = session.getLocalizedResources();
// unless already installed on all the cluster nodes, we'll have to
// localize hive-exec.jar as well.