You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/05/20 01:16:37 UTC
[2/3] hive git commit: HIVE-16703 : Hive may add the same file to the
session and vertex in Tez (Sergey Shelukhin, reviewed by Siddharth Seth)
HIVE-16703 : Hive may add the same file to the session and vertex in Tez (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d951fa4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d951fa4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d951fa4
Branch: refs/heads/master
Commit: 8d951fa4e0665942c4c1cb44a7914f70b0604f2d
Parents: 5d62dc8
Author: sergey <se...@apache.org>
Authored: Fri May 19 17:24:27 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Fri May 19 17:24:27 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/tez/DagUtils.java | 29 ++++++++++++++++----
.../hive/ql/exec/tez/TezSessionState.java | 5 ++--
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 14 +++++++---
.../hadoop/hive/ql/exec/tez/TestTezTask.java | 10 +++----
4 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index b0457be..b6e55c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -31,6 +31,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -847,12 +848,15 @@ public class DagUtils {
String hdfsDirPathStr, Configuration conf) throws IOException, LoginException {
List<LocalResource> tmpResources = new ArrayList<LocalResource>();
- addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, getTempFilesFromConf(conf));
- addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE, getTempArchivesFromConf(conf));
+ addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE,
+ getTempFilesFromConf(conf), null);
+ addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE,
+ getTempArchivesFromConf(conf), null);
return tmpResources;
}
- private static String[] getTempFilesFromConf(Configuration conf) {
+ public static String[] getTempFilesFromConf(Configuration conf) {
+ if (conf == null) return new String[0]; // In tests.
String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
if (StringUtils.isNotBlank(addedFiles)) {
HiveConf.setVar(conf, ConfVars.HIVEADDEDFILES, addedFiles);
@@ -888,21 +892,34 @@ public class DagUtils {
* @throws LoginException when getDefaultDestDir fails with the same exception
*/
public List<LocalResource> localizeTempFiles(String hdfsDirPathStr, Configuration conf,
- String[] inputOutputJars) throws IOException, LoginException {
+ String[] inputOutputJars, String[] skipJars) throws IOException, LoginException {
if (inputOutputJars == null) return null;
List<LocalResource> tmpResources = new ArrayList<LocalResource>();
- addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, inputOutputJars);
+ addTempResources(conf, tmpResources, hdfsDirPathStr,
+ LocalResourceType.FILE, inputOutputJars, skipJars);
return tmpResources;
}
private void addTempResources(Configuration conf,
List<LocalResource> tmpResources, String hdfsDirPathStr,
LocalResourceType type,
- String[] files) throws IOException {
+ String[] files, String[] skipFiles) throws IOException {
+ HashSet<Path> skipFileSet = null;
+ if (skipFiles != null) {
+ skipFileSet = new HashSet<>();
+ for (String skipFile : skipFiles) {
+ if (StringUtils.isBlank(skipFile)) continue;
+ skipFileSet.add(new Path(skipFile));
+ }
+ }
for (String file : files) {
if (!StringUtils.isNotBlank(file)) {
continue;
}
+ if (skipFileSet != null && skipFileSet.contains(new Path(file))) {
+ LOG.info("Skipping vertex resource " + file + " that already exists in the session");
+ continue;
+ }
Path hdfsFilePath = new Path(hdfsDirPathStr, getResourceBaseName(new Path(file)));
LocalResource localResource = localizeResource(new Path(file),
hdfsFilePath, type, conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 036e918..fe5c6a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -476,9 +476,10 @@ public class TezSessionState {
localizedResources.addAll(lrs);
}
- // these are local resources that are set through the mr "tmpjars" property
+ // these are local resources that are set through the mr "tmpjars" property; skip session files.
List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf,
- additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]));
+ additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]),
+ DagUtils.getTempFilesFromConf(conf));
if (handlerLr != null) {
localizedResources.addAll(handlerLr);
http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 1c84c6a..3356dc9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -123,6 +123,7 @@ public class TezTask extends Task<TezWork> {
return counters;
}
+
@Override
public int execute(DriverContext driverContext) {
int rc = 1;
@@ -161,8 +162,12 @@ public class TezTask extends Task<TezWork> {
// create the tez tmp dir
scratchDir = utils.createTezDir(scratchDir, conf);
+ // This is used to compare global and vertex resources. Global resources are originally
+ // derived from session conf via localizeTempFilesFromConf. So, use that here.
+ Configuration sessionConf =
+ (session != null && session.getConf() != null) ? session.getConf() : conf;
Map<String,LocalResource> inputOutputLocalResources =
- getExtraLocalResources(jobConf, scratchDir, inputOutputJars);
+ getExtraLocalResources(jobConf, scratchDir, inputOutputJars, sessionConf);
// Ensure the session is open and has the necessary local resources
updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
@@ -273,10 +278,11 @@ public class TezTask extends Task<TezWork> {
* Converted the list of jars into local resources
*/
Map<String,LocalResource> getExtraLocalResources(JobConf jobConf, Path scratchDir,
- String[] inputOutputJars) throws Exception {
+ String[] inputOutputJars, Configuration sessionConf) throws Exception {
final Map<String,LocalResource> resources = new HashMap<String,LocalResource>();
- final List<LocalResource> localResources = utils.localizeTempFiles(
- scratchDir.toString(), jobConf, inputOutputJars);
+ // Skip the files already in session local resources...
+ final List<LocalResource> localResources = utils.localizeTempFiles(scratchDir.toString(),
+ jobConf, inputOutputJars, DagUtils.getTempFilesFromConf(sessionConf));
if (null != localResources) {
for (LocalResource lr : localResources) {
resources.put(utils.getBaseName(lr), lr);
http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 2b52056..70fedb7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -245,7 +245,7 @@ public class TestTezTask {
final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
resMap.put("foo.jar", res);
- when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+ when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null))
.thenReturn(resources);
when(utils.getBaseName(res)).thenReturn("foo.jar");
when(sessionState.isOpen()).thenReturn(true);
@@ -264,7 +264,7 @@ public class TestTezTask {
resMap.put("foo.jar", res);
DAG dag = mock(DAG.class);
- when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+ when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null))
.thenReturn(resources);
when(utils.getBaseName(res)).thenReturn("foo.jar");
when(sessionState.isOpen()).thenReturn(true);
@@ -282,11 +282,11 @@ public class TestTezTask {
final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
resMap.put("foo.jar", res);
- when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
- .thenReturn(resources);
+ when(utils.localizeTempFiles(eq(path.toString()), eq(conf), eq(inputOutputJars),
+ Mockito.<String[]>any())).thenReturn(resources);
when(utils.getBaseName(res)).thenReturn("foo.jar");
- assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars));
+ assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars, null));
}
@Test