You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/09/22 21:32:49 UTC
svn commit: r1626877 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/exec/tez/ java/org/apache/hadoop/hive/ql/plan/
test/org/apache/hadoop/hive/ql/exec/tez/ test/org/apache/hadoop/hive/ql/plan/
Author: sershe
Date: Mon Sep 22 19:32:48 2014
New Revision: 1626877
URL: http://svn.apache.org/r1626877
Log:
HIVE-7950 : StorageHandler resources aren't added to Tez Session if already Session is already Open (Josh Elser, reviewed by Gopal V and Sergey Shelukhin)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1626877&r1=1626876&r2=1626877&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Mon Sep 22 19:32:48 2014
@@ -270,11 +270,16 @@ public class TezSessionPoolManager {
public void closeAndOpen(TezSessionState sessionState, HiveConf conf)
throws Exception {
+ closeAndOpen(sessionState, conf, null);
+ }
+
+ public void closeAndOpen(TezSessionState sessionState, HiveConf conf,
+ String[] additionalFiles) throws Exception {
HiveConf sessionConf = sessionState.getConf();
if (sessionConf != null && sessionConf.get("tez.queue.name") != null) {
conf.set("tez.queue.name", sessionConf.get("tez.queue.name"));
}
close(sessionState);
- sessionState.open(conf);
+ sessionState.open(conf, additionalFiles);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1626877&r1=1626876&r2=1626877&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Sep 22 19:32:48 2014
@@ -55,6 +55,7 @@ import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.client.DAGClient;
@@ -124,14 +125,11 @@ public class TezTask extends Task<TezWor
// create the tez tmp dir
scratchDir = utils.createTezDir(scratchDir, conf);
- if (!session.isOpen()) {
- // can happen if the user sets the tez flag after the session was
- // established
- LOG.info("Tez session hasn't been created yet. Opening session");
- session.open(conf, inputOutputJars);
- } else {
- session.refreshLocalResourcesFromConf(conf);
- }
+ Map<String,LocalResource> inputOutputLocalResources =
+ getExtraLocalResources(jobConf, scratchDir, inputOutputJars);
+
+ // Ensure the session is open and has the necessary local resources
+ updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
List<LocalResource> additionalLr = session.getLocalizedResources();
@@ -153,8 +151,12 @@ public class TezTask extends Task<TezWor
// next we translate the TezWork to a Tez DAG
DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx);
+ // Add the extra resources to the dag
+ addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);
+
// submit will send the job to the cluster and start executing
- client = submit(jobConf, dag, scratchDir, appJarLr, session, additionalLr);
+ client = submit(jobConf, dag, scratchDir, appJarLr, session,
+ additionalLr, inputOutputJars, inputOutputLocalResources);
// finally monitor will print progress until the job is done
TezJobMonitor monitor = new TezJobMonitor();
@@ -195,6 +197,63 @@ public class TezTask extends Task<TezWor
return rc;
}
+ /**
+ * Converted the list of jars into local resources
+ */
+ Map<String,LocalResource> getExtraLocalResources(JobConf jobConf, Path scratchDir,
+ String[] inputOutputJars) throws Exception {
+ final Map<String,LocalResource> resources = new HashMap<String,LocalResource>();
+ final List<LocalResource> localResources = utils.localizeTempFiles(
+ scratchDir.toString(), jobConf, inputOutputJars);
+ if (null != localResources) {
+ for (LocalResource lr : localResources) {
+ resources.put(utils.getBaseName(lr), lr);
+ }
+ }
+ return resources;
+ }
+
+ /**
+ * Ensures that the Tez Session is open and the AM has all necessary jars configured.
+ */
+ void updateSession(TezSessionState session,
+ JobConf jobConf, Path scratchDir, String[] inputOutputJars,
+ Map<String,LocalResource> extraResources) throws Exception {
+ final boolean missingLocalResources = !session
+ .hasResources(inputOutputJars);
+
+ if (!session.isOpen()) {
+ // can happen if the user sets the tez flag after the session was
+ // established
+ LOG.info("Tez session hasn't been created yet. Opening session");
+ session.open(conf, inputOutputJars);
+ } else {
+ LOG.info("Session is already open");
+
+ // Ensure the open session has the necessary resources (StorageHandler)
+ if (missingLocalResources) {
+ LOG.info("Tez session missing resources," +
+ " adding additional necessary resources");
+ session.getSession().addAppMasterLocalFiles(extraResources);
+ }
+
+ session.refreshLocalResourcesFromConf(conf);
+ }
+ }
+
+ /**
+ * Adds any necessary resources that must be localized in each vertex to the DAG.
+ */
+ void addExtraResourcesToDag(TezSessionState session, DAG dag,
+ String[] inputOutputJars,
+ Map<String,LocalResource> inputOutputLocalResources) throws Exception {
+ if (!session.hasResources(inputOutputJars)) {
+ if (null != inputOutputLocalResources) {
+ dag.addTaskLocalFiles(inputOutputLocalResources);
+ }
+ }
+ }
+
DAG build(JobConf conf, TezWork work, Path scratchDir,
LocalResource appJarLr, List<LocalResource> additionalLr, Context ctx)
throws Exception {
@@ -287,7 +346,8 @@ public class TezTask extends Task<TezWor
DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
LocalResource appJarLr, TezSessionState sessionState,
- List<LocalResource> additionalLr)
+ List<LocalResource> additionalLr, String[] inputOutputJars,
+ Map<String,LocalResource> inputOutputLocalResources)
throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
@@ -308,7 +368,7 @@ public class TezTask extends Task<TezWor
console.printInfo("Tez session was closed. Reopening...");
// close the old one, but keep the tmp files around
- TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf);
+ TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars);
console.printInfo("Session re-established.");
dagClient = sessionState.getSession().submitDAG(dag);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java?rev=1626877&r1=1626876&r2=1626877&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java Mon Sep 22 19:32:48 2014
@@ -19,12 +19,13 @@
package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -305,15 +306,23 @@ public class TezWork extends AbstractOpe
work.configureJobConf(jobConf);
}
String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY);
- if (oldTmpJars != null && (oldTmpJars.length != 0)) {
- if (newTmpJars != null && (newTmpJars.length != 0)) {
- String[] combinedTmpJars = new String[newTmpJars.length + oldTmpJars.length];
- System.arraycopy(oldTmpJars, 0, combinedTmpJars, 0, oldTmpJars.length);
- System.arraycopy(newTmpJars, 0, combinedTmpJars, oldTmpJars.length, newTmpJars.length);
- jobConf.setStrings(MR_JAR_PROPERTY, combinedTmpJars);
+ if (oldTmpJars != null || newTmpJars != null) {
+ String[] finalTmpJars;
+ if (oldTmpJars == null || oldTmpJars.length == 0) {
+ // Avoid a copy when oldTmpJars is null or empty
+ finalTmpJars = newTmpJars;
+ } else if (newTmpJars == null || newTmpJars.length == 0) {
+ // Avoid a copy when newTmpJars is null or empty
+ finalTmpJars = oldTmpJars;
} else {
- jobConf.setStrings(MR_JAR_PROPERTY, oldTmpJars);
+ // Both are non-empty, only copy now
+ finalTmpJars = new String[oldTmpJars.length + newTmpJars.length];
+ System.arraycopy(oldTmpJars, 0, finalTmpJars, 0, oldTmpJars.length);
+ System.arraycopy(newTmpJars, 0, finalTmpJars, oldTmpJars.length, newTmpJars.length);
}
+
+ jobConf.setStrings(MR_JAR_PROPERTY, finalTmpJars);
+ return finalTmpJars;
}
return newTmpJars;
}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java?rev=1626877&r1=1626876&r2=1626877&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java Mon Sep 22 19:32:48 2014
@@ -26,6 +26,7 @@ import java.util.Random;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.apache.hadoop.hive.conf.HiveConf;
public class TestTezSessionPool {
@@ -157,4 +158,29 @@ public class TestTezSessionPool {
}
}
}
+
+ @Test
+ public void testCloseAndOpenDefault() throws Exception {
+ poolManager = new TestTezSessionPoolManager();
+ TezSessionState session = Mockito.mock(TezSessionState.class);
+ Mockito.when(session.isDefault()).thenReturn(false);
+
+ poolManager.closeAndOpen(session, conf);
+
+ Mockito.verify(session).close(false);
+ Mockito.verify(session).open(conf, null);
+ }
+
+ @Test
+ public void testCloseAndOpenWithResources() throws Exception {
+ poolManager = new TestTezSessionPoolManager();
+ TezSessionState session = Mockito.mock(TezSessionState.class);
+ Mockito.when(session.isDefault()).thenReturn(false);
+ String[] extraResources = new String[] { "file:///tmp/foo.jar" };
+
+ poolManager.closeAndOpen(session, conf, extraResources);
+
+ Mockito.verify(session).close(false);
+ Mockito.verify(session).open(conf, extraResources);
+ }
}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1626877&r1=1626876&r2=1626877&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Mon Sep 22 19:32:48 2014
@@ -30,9 +30,11 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -204,9 +206,10 @@ public class TestTezTask {
@Test
public void testSubmit() throws Exception {
DAG dag = DAG.create("test");
- task.submit(conf, dag, path, appLr, sessionState, new LinkedList());
+ task.submit(conf, dag, path, appLr, sessionState, Collections.<LocalResource> emptyList(),
+ new String[0], Collections.<String,LocalResource> emptyMap());
// validate close/reopen
- verify(sessionState, times(1)).open(any(HiveConf.class));
+ verify(sessionState, times(1)).open(any(HiveConf.class), any(String[].class));
verify(sessionState, times(1)).close(eq(false)); // now uses pool after HIVE-7043
verify(session, times(2)).submitDAG(any(DAG.class));
}
@@ -216,4 +219,54 @@ public class TestTezTask {
task.close(work, 0);
verify(op, times(4)).jobClose(any(Configuration.class), eq(true));
}
+
+ @Test
+ public void testExistingSessionGetsStorageHandlerResources() throws Exception {
+ final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+ LocalResource res = mock(LocalResource.class);
+ final List<LocalResource> resources = Collections.singletonList(res);
+ final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
+ resMap.put("foo.jar", res);
+
+ when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+ .thenReturn(resources);
+ when(utils.getBaseName(res)).thenReturn("foo.jar");
+ when(sessionState.isOpen()).thenReturn(true);
+ when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
+ task.updateSession(sessionState, conf, path, inputOutputJars, resMap);
+ verify(session).addAppMasterLocalFiles(resMap);
+ }
+
+ @Test
+ public void testExtraResourcesAddedToDag() throws Exception {
+ final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+ LocalResource res = mock(LocalResource.class);
+ final List<LocalResource> resources = Collections.singletonList(res);
+ final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
+ resMap.put("foo.jar", res);
+ DAG dag = mock(DAG.class);
+
+ when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+ .thenReturn(resources);
+ when(utils.getBaseName(res)).thenReturn("foo.jar");
+ when(sessionState.isOpen()).thenReturn(true);
+ when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
+ task.addExtraResourcesToDag(sessionState, dag, inputOutputJars, resMap);
+ verify(dag).addTaskLocalFiles(resMap);
+ }
+
+ @Test
+ public void testGetExtraLocalResources() throws Exception {
+ final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+ LocalResource res = mock(LocalResource.class);
+ final List<LocalResource> resources = Collections.singletonList(res);
+ final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
+ resMap.put("foo.jar", res);
+
+ when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+ .thenReturn(resources);
+ when(utils.getBaseName(res)).thenReturn("foo.jar");
+
+ assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars));
+ }
}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java?rev=1626877&r1=1626876&r2=1626877&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java Mon Sep 22 19:32:48 2014
@@ -23,11 +23,16 @@ import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
+import org.apache.hadoop.mapred.JobConf;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestTezWork {
+ private static final String MR_JAR_PROPERTY = "tmpjars";
private List<BaseWork> nodes;
private TezWork work;
@@ -156,4 +161,75 @@ public class TestTezWork {
Assert.assertEquals(sorted.get(i), nodes.get(4-i));
}
}
+
+ @Test
+ public void testConfigureJars() throws Exception {
+ final JobConf conf = new JobConf();
+ conf.set(MR_JAR_PROPERTY, "file:///tmp/foo1.jar");
+ BaseWork baseWork = Mockito.mock(BaseWork.class);
+ Mockito.doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ conf.set(MR_JAR_PROPERTY, "file:///tmp/foo2.jar");
+ return null;
+ }
+
+ }).when(baseWork).configureJobConf(conf);
+
+ work.add(baseWork);
+ work.configureJobConfAndExtractJars(conf);
+ Assert.assertEquals("file:///tmp/foo1.jar,file:///tmp/foo2.jar", conf.get(MR_JAR_PROPERTY));
+ }
+
+ @Test
+ public void testConfigureJarsNoExtraJars() throws Exception {
+ final JobConf conf = new JobConf();
+ conf.set(MR_JAR_PROPERTY, "file:///tmp/foo1.jar");
+ BaseWork baseWork = Mockito.mock(BaseWork.class);
+
+ work.add(baseWork);
+ work.configureJobConfAndExtractJars(conf);
+ Assert.assertEquals("file:///tmp/foo1.jar", conf.get(MR_JAR_PROPERTY));
+ }
+
+ @Test
+ public void testConfigureJarsWithNull() throws Exception {
+ final JobConf conf = new JobConf();
+ conf.set(MR_JAR_PROPERTY, "file:///tmp/foo1.jar");
+ BaseWork baseWork = Mockito.mock(BaseWork.class);
+ Mockito.doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ conf.unset(MR_JAR_PROPERTY);
+ return null;
+ }
+
+ }).when(baseWork).configureJobConf(conf);
+
+ work.add(baseWork);
+ work.configureJobConfAndExtractJars(conf);
+ Assert.assertEquals("file:///tmp/foo1.jar", conf.get(MR_JAR_PROPERTY));
+ }
+
+ @Test
+ public void testConfigureJarsStartingWithNull() throws Exception {
+ final JobConf conf = new JobConf();
+ conf.unset(MR_JAR_PROPERTY);
+ BaseWork baseWork = Mockito.mock(BaseWork.class);
+ Mockito.doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ conf.setStrings(MR_JAR_PROPERTY, "file:///tmp/foo1.jar", "file:///tmp/foo2.jar");
+ return null;
+ }
+
+ }).when(baseWork).configureJobConf(conf);
+
+ work.add(baseWork);
+ work.configureJobConfAndExtractJars(conf);
+ Assert.assertEquals("file:///tmp/foo1.jar,file:///tmp/foo2.jar", conf.get(MR_JAR_PROPERTY));
+ }
}