You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/06/16 20:28:21 UTC
svn commit: r1602951 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/exec/tez/
java/org/apache/hadoop/hive/ql/session/
test/org/apache/hadoop/hive/ql/exec/tez/
Author: gunther
Date: Mon Jun 16 18:28:20 2014
New Revision: 1602951
URL: http://svn.apache.org/r1602951
Log:
HIVE-7212: Use resource re-localization instead of restarting sessions in Tez (Gunther Hagleitner, reviewed by Vikram Dixit, Gopal V, Sid Seth)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1602951&r1=1602950&r2=1602951&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Jun 16 18:28:20 2014
@@ -845,7 +845,7 @@ public class DagUtils {
}
return createLocalResource(destFS, dest, LocalResourceType.FILE,
- LocalResourceVisibility.APPLICATION);
+ LocalResourceVisibility.PRIVATE);
}
/**
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1602951&r1=1602950&r2=1602951&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Mon Jun 16 18:28:20 2014
@@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import javax.security.auth.login.LoginException;
@@ -76,8 +77,8 @@ public class TezSessionState {
private boolean defaultQueue = false;
private String user;
- private HashSet<String> additionalFilesNotFromConf = null;
- private List<LocalResource> localizedResources;
+ private final Set<String> additionalFilesNotFromConf = new HashSet<String>();
+ private final Set<LocalResource> localizedResources = new HashSet<LocalResource>();
private static List<TezSessionState> openSessions
= Collections.synchronizedList(new LinkedList<TezSessionState>());
@@ -143,22 +144,15 @@ public class TezSessionState {
// create the tez tmp dir
tezScratchDir = createTezDir(sessionId);
- String dir = tezScratchDir.toString();
- // Localize resources to session scratch dir
- localizedResources = utils.localizeTempFilesFromConf(dir, conf);
- List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf, additionalFiles);
- if (handlerLr != null) {
- if (localizedResources == null) {
- localizedResources = handlerLr;
- } else {
- localizedResources.addAll(handlerLr);
- }
- additionalFilesNotFromConf = new HashSet<String>();
+ additionalFilesNotFromConf.clear();
+ if (additionalFiles != null) {
for (String originalFile : additionalFiles) {
additionalFilesNotFromConf.add(originalFile);
}
}
+ refreshLocalResourcesFromConf(conf);
+
// generate basic tez config
TezConfiguration tezConfig = new TezConfiguration(conf);
@@ -171,10 +165,8 @@ public class TezSessionState {
// configuration for the application master
Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr);
- if (localizedResources != null) {
- for (LocalResource lr : localizedResources) {
- commonLocalResources.put(utils.getBaseName(lr), lr);
- }
+ for (LocalResource lr : localizedResources) {
+ commonLocalResources.put(utils.getBaseName(lr), lr);
}
// Create environment for AM.
@@ -216,9 +208,31 @@ public class TezSessionState {
openSessions.add(this);
}
+ public void refreshLocalResourcesFromConf(HiveConf conf)
+ throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException {
+
+ String dir = tezScratchDir.toString();
+
+ localizedResources.clear();
+
+ // these are local resources set through add file, jar, etc
+ List<LocalResource> lrs = utils.localizeTempFilesFromConf(dir, conf);
+ if (lrs != null) {
+ localizedResources.addAll(lrs);
+ }
+
+ // these are local resources that are set through the mr "tmpjars" property
+ List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf,
+ additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]));
+
+ if (handlerLr != null) {
+ localizedResources.addAll(handlerLr);
+ }
+ }
+
public boolean hasResources(String[] localAmResources) {
if (localAmResources == null || localAmResources.length == 0) return true;
- if (additionalFilesNotFromConf == null || additionalFilesNotFromConf.isEmpty()) return false;
+ if (additionalFilesNotFromConf.isEmpty()) return false;
for (String s : localAmResources) {
if (!additionalFilesNotFromConf.contains(s)) return false;
}
@@ -252,8 +266,8 @@ public class TezSessionState {
tezScratchDir = null;
conf = null;
appJarLr = null;
- additionalFilesNotFromConf = null;
- localizedResources = null;
+ additionalFilesNotFromConf.clear();
+ localizedResources.clear();
}
public void cleanupScratchDir () throws IOException {
@@ -369,7 +383,7 @@ public class TezSessionState {
}
public List<LocalResource> getLocalizedResources() {
- return localizedResources;
+ return new ArrayList<LocalResource>(localizedResources);
}
public String getUser() {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1602951&r1=1602950&r2=1602951&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Jun 16 18:28:20 2014
@@ -120,33 +120,28 @@ public class TezTask extends Task<TezWor
// create the tez tmp dir
scratchDir = utils.createTezDir(scratchDir, conf);
- boolean hasResources = session.hasResources(inputOutputJars);
-
- if (ss.hasAddedResource()) {
- // need to re-launch session because of new added jars.
- hasResources = false;
- // reset the added resource flag for this session since we would
- // relocalize (either by restarting or relocalizing) due to the above
- // hasResources flag.
- ss.setAddedResource(false);
- }
-
- // If we have any jars from input format, we need to restart the session because
- // AM will need them; so, AM has to be restarted. What a mess...
- if (!hasResources && session.isOpen()) {
- LOG.info("Tez session being reopened to pass custom jars to AM");
- TezSessionPoolManager.getInstance().close(session);
- session = TezSessionPoolManager.getInstance().getSession(null, conf, false);
- ss.setTezSession(session);
- }
if (!session.isOpen()) {
// can happen if the user sets the tez flag after the session was
// established
LOG.info("Tez session hasn't been created yet. Opening session");
session.open(conf, inputOutputJars);
+ } else {
+ session.refreshLocalResourcesFromConf(conf);
}
+
List<LocalResource> additionalLr = session.getLocalizedResources();
+
+ // log which resources we're adding (apart from the hive exec)
+ if (LOG.isDebugEnabled()) {
+ if (additionalLr == null || additionalLr.size() == 0) {
+ LOG.debug("No local resources to process (other than hive-exec)");
+ } else {
+ for (LocalResource lr: additionalLr) {
+ LOG.debug("Adding local resource: " + lr.getResource());
+ }
+ }
+ }
// unless already installed on all the cluster nodes, we'll have to
// localize hive-exec.jar as well.
@@ -156,7 +151,7 @@ public class TezTask extends Task<TezWor
DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx);
// submit will send the job to the cluster and start executing
- client = submit(jobConf, dag, scratchDir, appJarLr, session);
+ client = submit(jobConf, dag, scratchDir, appJarLr, session, additionalLr);
// finally monitor will print progress until the job is done
TezJobMonitor monitor = new TezJobMonitor();
@@ -290,15 +285,23 @@ public class TezTask extends Task<TezWor
}
DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
- LocalResource appJarLr, TezSessionState sessionState)
+ LocalResource appJarLr, TezSessionState sessionState,
+ List<LocalResource> additionalLr)
throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
DAGClient dagClient = null;
+ Map<String, LocalResource> resourceMap = new HashMap<String, LocalResource>();
+ if (additionalLr != null) {
+ for (LocalResource lr: additionalLr) {
+ resourceMap.put(utils.getBaseName(lr), lr);
+ }
+ }
+
try {
// ready to start execution on the cluster
- dagClient = sessionState.getSession().submitDAG(dag);
+ dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
} catch (SessionNotRunning nr) {
console.printInfo("Tez session was closed. Reopening...");
@@ -306,7 +309,7 @@ public class TezTask extends Task<TezWor
TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf);
console.printInfo("Session re-established.");
- dagClient = sessionState.getSession().submitDAG(dag);
+ dagClient = sessionState.getSession().submitDAG(dag, resourceMap);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1602951&r1=1602950&r2=1602951&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Mon Jun 16 18:28:20 2014
@@ -164,8 +164,6 @@ public class SessionState {
private final String CONFIG_AUTHZ_SETTINGS_APPLIED_MARKER =
"hive.internal.ss.authz.settings.applied.marker";
- private boolean addedResource;
-
/**
* Lineage state.
*/
@@ -1004,12 +1002,4 @@ public class SessionState {
conf.set(CONFIG_AUTHZ_SETTINGS_APPLIED_MARKER, Boolean.TRUE.toString());
}
-
- public boolean hasAddedResource() {
- return addedResource;
- }
-
- public void setAddedResource(boolean addedResouce) {
- this.addedResource = addedResouce;
- }
}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1602951&r1=1602950&r2=1602951&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Mon Jun 16 18:28:20 2014
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -200,7 +201,7 @@ public class TestTezTask {
@Test
public void testSubmit() throws Exception {
DAG dag = new DAG("test");
- task.submit(conf, dag, path, appLr, sessionState);
+ task.submit(conf, dag, path, appLr, sessionState, new LinkedList());
// validate close/reopen
verify(sessionState, times(1)).open(any(HiveConf.class));
verify(sessionState, times(1)).close(eq(false)); // now uses pool after HIVE-7043