You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/09/22 21:32:49 UTC

svn commit: r1626877 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/tez/ java/org/apache/hadoop/hive/ql/plan/ test/org/apache/hadoop/hive/ql/exec/tez/ test/org/apache/hadoop/hive/ql/plan/

Author: sershe
Date: Mon Sep 22 19:32:48 2014
New Revision: 1626877

URL: http://svn.apache.org/r1626877
Log:
HIVE-7950 : StorageHandler resources aren't added to Tez Session if already Session is already Open (Josh Elser, reviewed by Gopal V and Sergey Shelukhin)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1626877&r1=1626876&r2=1626877&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Mon Sep 22 19:32:48 2014
@@ -270,11 +270,16 @@ public class TezSessionPoolManager {
 
   public void closeAndOpen(TezSessionState sessionState, HiveConf conf)
       throws Exception {
+    closeAndOpen(sessionState, conf, null);
+  }
+
+  public void closeAndOpen(TezSessionState sessionState, HiveConf conf,
+      String[] additionalFiles) throws Exception {
     HiveConf sessionConf = sessionState.getConf();
     if (sessionConf != null && sessionConf.get("tez.queue.name") != null) {
       conf.set("tez.queue.name", sessionConf.get("tez.queue.name"));
     }
     close(sessionState);
-    sessionState.open(conf);
+    sessionState.open(conf, additionalFiles);
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1626877&r1=1626876&r2=1626877&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Sep 22 19:32:48 2014
@@ -55,6 +55,7 @@ import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.client.DAGClient;
@@ -124,14 +125,11 @@ public class TezTask extends Task<TezWor
       // create the tez tmp dir
       scratchDir = utils.createTezDir(scratchDir, conf);
 
-      if (!session.isOpen()) {
-        // can happen if the user sets the tez flag after the session was
-        // established
-        LOG.info("Tez session hasn't been created yet. Opening session");
-        session.open(conf, inputOutputJars);
-      } else {
-        session.refreshLocalResourcesFromConf(conf);
-      }
+      Map<String,LocalResource> inputOutputLocalResources =
+          getExtraLocalResources(jobConf, scratchDir, inputOutputJars);
+
+      // Ensure the session is open and has the necessary local resources
+      updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
 
       List<LocalResource> additionalLr = session.getLocalizedResources();
 
@@ -153,8 +151,12 @@ public class TezTask extends Task<TezWor
       // next we translate the TezWork to a Tez DAG
       DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx);
 
+      // Add the extra resources to the dag
+      addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);
+
       // submit will send the job to the cluster and start executing
-      client = submit(jobConf, dag, scratchDir, appJarLr, session, additionalLr);
+      client = submit(jobConf, dag, scratchDir, appJarLr, session,
+          additionalLr, inputOutputJars, inputOutputLocalResources);
 
       // finally monitor will print progress until the job is done
       TezJobMonitor monitor = new TezJobMonitor();
@@ -195,6 +197,63 @@ public class TezTask extends Task<TezWor
     return rc;
   }
 
+  /**
+   * Converted the list of jars into local resources
+   */
+  Map<String,LocalResource> getExtraLocalResources(JobConf jobConf, Path scratchDir,
+      String[] inputOutputJars) throws Exception {
+    final Map<String,LocalResource> resources = new HashMap<String,LocalResource>();
+    final List<LocalResource> localResources = utils.localizeTempFiles(
+        scratchDir.toString(), jobConf, inputOutputJars);
+    if (null != localResources) {
+      for (LocalResource lr : localResources) {
+        resources.put(utils.getBaseName(lr), lr);
+      }
+    }
+    return resources;
+  }
+
+  /**
+   * Ensures that the Tez Session is open and the AM has all necessary jars configured.
+   */
+  void updateSession(TezSessionState session,
+      JobConf jobConf, Path scratchDir, String[] inputOutputJars,
+      Map<String,LocalResource> extraResources) throws Exception {
+    final boolean missingLocalResources = !session
+        .hasResources(inputOutputJars);
+
+    if (!session.isOpen()) {
+      // can happen if the user sets the tez flag after the session was
+      // established
+      LOG.info("Tez session hasn't been created yet. Opening session");
+      session.open(conf, inputOutputJars);
+    } else {
+      LOG.info("Session is already open");
+
+      // Ensure the open session has the necessary resources (StorageHandler)
+      if (missingLocalResources) {
+        LOG.info("Tez session missing resources," +
+            " adding additional necessary resources");
+        session.getSession().addAppMasterLocalFiles(extraResources);
+      }
+
+      session.refreshLocalResourcesFromConf(conf);
+    }
+  }
+
+  /**
+   * Adds any necessary resources that must be localized in each vertex to the DAG.
+   */
+  void addExtraResourcesToDag(TezSessionState session, DAG dag,
+      String[] inputOutputJars,
+      Map<String,LocalResource> inputOutputLocalResources) throws Exception {
+    if (!session.hasResources(inputOutputJars)) {
+      if (null != inputOutputLocalResources) {
+        dag.addTaskLocalFiles(inputOutputLocalResources);
+      }
+    }
+  }
+
   DAG build(JobConf conf, TezWork work, Path scratchDir,
       LocalResource appJarLr, List<LocalResource> additionalLr, Context ctx)
       throws Exception {
@@ -287,7 +346,8 @@ public class TezTask extends Task<TezWor
 
   DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
       LocalResource appJarLr, TezSessionState sessionState,
-      List<LocalResource> additionalLr)
+      List<LocalResource> additionalLr, String[] inputOutputJars,
+      Map<String,LocalResource> inputOutputLocalResources)
       throws Exception {
 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
@@ -308,7 +368,7 @@ public class TezTask extends Task<TezWor
       console.printInfo("Tez session was closed. Reopening...");
 
       // close the old one, but keep the tmp files around
-      TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf);
+      TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars);
       console.printInfo("Session re-established.");
 
       dagClient = sessionState.getSession().submitDAG(dag);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java?rev=1626877&r1=1626876&r2=1626877&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java Mon Sep 22 19:32:48 2014
@@ -19,12 +19,13 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -305,15 +306,23 @@ public class TezWork extends AbstractOpe
       work.configureJobConf(jobConf);
     }
     String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY);
-    if (oldTmpJars != null && (oldTmpJars.length != 0)) {
-      if (newTmpJars != null && (newTmpJars.length != 0)) {
-        String[] combinedTmpJars = new String[newTmpJars.length + oldTmpJars.length];
-        System.arraycopy(oldTmpJars, 0, combinedTmpJars, 0, oldTmpJars.length);
-        System.arraycopy(newTmpJars, 0, combinedTmpJars, oldTmpJars.length, newTmpJars.length);
-        jobConf.setStrings(MR_JAR_PROPERTY, combinedTmpJars);
+    if (oldTmpJars != null || newTmpJars != null) {
+      String[] finalTmpJars;
+      if (oldTmpJars == null || oldTmpJars.length == 0) {
+        // Avoid a copy when oldTmpJars is null or empty
+        finalTmpJars = newTmpJars;
+      } else if (newTmpJars == null || newTmpJars.length == 0) {
+        // Avoid a copy when newTmpJars is null or empty
+        finalTmpJars = oldTmpJars;
       } else {
-        jobConf.setStrings(MR_JAR_PROPERTY, oldTmpJars);
+        // Both are non-empty, only copy now
+        finalTmpJars = new String[oldTmpJars.length + newTmpJars.length];
+        System.arraycopy(oldTmpJars, 0, finalTmpJars, 0, oldTmpJars.length);
+        System.arraycopy(newTmpJars, 0, finalTmpJars, oldTmpJars.length, newTmpJars.length);
       }
+
+      jobConf.setStrings(MR_JAR_PROPERTY, finalTmpJars);
+      return finalTmpJars;
     }
     return newTmpJars;
    }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java?rev=1626877&r1=1626876&r2=1626877&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java Mon Sep 22 19:32:48 2014
@@ -26,6 +26,7 @@ import java.util.Random;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.apache.hadoop.hive.conf.HiveConf;
 
 public class TestTezSessionPool {
@@ -157,4 +158,29 @@ public class TestTezSessionPool {
         }
       }
     }
+
+  @Test
+  public void testCloseAndOpenDefault() throws Exception {
+    poolManager = new TestTezSessionPoolManager();
+    TezSessionState session = Mockito.mock(TezSessionState.class);
+    Mockito.when(session.isDefault()).thenReturn(false);
+
+    poolManager.closeAndOpen(session, conf);
+
+    Mockito.verify(session).close(false);
+    Mockito.verify(session).open(conf, null);
+  }
+
+  @Test
+  public void testCloseAndOpenWithResources() throws Exception {
+    poolManager = new TestTezSessionPoolManager();
+    TezSessionState session = Mockito.mock(TezSessionState.class);
+    Mockito.when(session.isDefault()).thenReturn(false);
+    String[] extraResources = new String[] { "file:///tmp/foo.jar" };
+
+    poolManager.closeAndOpen(session, conf, extraResources);
+
+    Mockito.verify(session).close(false);
+    Mockito.verify(session).open(conf, extraResources);
+  }
 }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1626877&r1=1626876&r2=1626877&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Mon Sep 22 19:32:48 2014
@@ -30,9 +30,11 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -204,9 +206,10 @@ public class TestTezTask {
   @Test
   public void testSubmit() throws Exception {
     DAG dag = DAG.create("test");
-    task.submit(conf, dag, path, appLr, sessionState, new LinkedList());
+    task.submit(conf, dag, path, appLr, sessionState, Collections.<LocalResource> emptyList(),
+        new String[0], Collections.<String,LocalResource> emptyMap());
     // validate close/reopen
-    verify(sessionState, times(1)).open(any(HiveConf.class));
+    verify(sessionState, times(1)).open(any(HiveConf.class), any(String[].class));
     verify(sessionState, times(1)).close(eq(false));  // now uses pool after HIVE-7043
     verify(session, times(2)).submitDAG(any(DAG.class));
   }
@@ -216,4 +219,54 @@ public class TestTezTask {
     task.close(work, 0);
     verify(op, times(4)).jobClose(any(Configuration.class), eq(true));
   }
+
+  @Test
+  public void testExistingSessionGetsStorageHandlerResources() throws Exception {
+    final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+    LocalResource res = mock(LocalResource.class);
+    final List<LocalResource> resources = Collections.singletonList(res);
+    final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
+    resMap.put("foo.jar", res);
+
+    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+        .thenReturn(resources);
+    when(utils.getBaseName(res)).thenReturn("foo.jar");
+    when(sessionState.isOpen()).thenReturn(true);
+    when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
+    task.updateSession(sessionState, conf, path, inputOutputJars, resMap);
+    verify(session).addAppMasterLocalFiles(resMap);
+  }
+
+  @Test
+  public void testExtraResourcesAddedToDag() throws Exception {
+    final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+    LocalResource res = mock(LocalResource.class);
+    final List<LocalResource> resources = Collections.singletonList(res);
+    final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
+    resMap.put("foo.jar", res);
+    DAG dag = mock(DAG.class);
+
+    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+        .thenReturn(resources);
+    when(utils.getBaseName(res)).thenReturn("foo.jar");
+    when(sessionState.isOpen()).thenReturn(true);
+    when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
+    task.addExtraResourcesToDag(sessionState, dag, inputOutputJars, resMap);
+    verify(dag).addTaskLocalFiles(resMap);
+  }
+
+  @Test
+  public void testGetExtraLocalResources() throws Exception {
+    final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+    LocalResource res = mock(LocalResource.class);
+    final List<LocalResource> resources = Collections.singletonList(res);
+    final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
+    resMap.put("foo.jar", res);
+
+    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+        .thenReturn(resources);
+    when(utils.getBaseName(res)).thenReturn("foo.jar");
+
+    assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars));
+  }
 }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java?rev=1626877&r1=1626876&r2=1626877&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java Mon Sep 22 19:32:48 2014
@@ -23,11 +23,16 @@ import java.util.List;
 import junit.framework.Assert;
 
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
+import org.apache.hadoop.mapred.JobConf;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestTezWork {
 
+  private static final String MR_JAR_PROPERTY = "tmpjars";
   private List<BaseWork> nodes;
   private TezWork work;
 
@@ -156,4 +161,75 @@ public class TestTezWork {
       Assert.assertEquals(sorted.get(i), nodes.get(4-i));
     }
   }
+
+  @Test
+  public void testConfigureJars() throws Exception {
+    final JobConf conf = new JobConf();
+    conf.set(MR_JAR_PROPERTY, "file:///tmp/foo1.jar");
+    BaseWork baseWork = Mockito.mock(BaseWork.class);
+    Mockito.doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        conf.set(MR_JAR_PROPERTY, "file:///tmp/foo2.jar");
+        return null;
+      }
+
+    }).when(baseWork).configureJobConf(conf);
+
+    work.add(baseWork);
+    work.configureJobConfAndExtractJars(conf);
+    Assert.assertEquals("file:///tmp/foo1.jar,file:///tmp/foo2.jar", conf.get(MR_JAR_PROPERTY));
+  }
+
+  @Test
+  public void testConfigureJarsNoExtraJars() throws Exception {
+    final JobConf conf = new JobConf();
+    conf.set(MR_JAR_PROPERTY, "file:///tmp/foo1.jar");
+    BaseWork baseWork = Mockito.mock(BaseWork.class);
+
+    work.add(baseWork);
+    work.configureJobConfAndExtractJars(conf);
+    Assert.assertEquals("file:///tmp/foo1.jar", conf.get(MR_JAR_PROPERTY));
+  }
+
+  @Test
+  public void testConfigureJarsWithNull() throws Exception {
+    final JobConf conf = new JobConf();
+    conf.set(MR_JAR_PROPERTY, "file:///tmp/foo1.jar");
+    BaseWork baseWork = Mockito.mock(BaseWork.class);
+    Mockito.doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        conf.unset(MR_JAR_PROPERTY);
+        return null;
+      }
+
+    }).when(baseWork).configureJobConf(conf);
+
+    work.add(baseWork);
+    work.configureJobConfAndExtractJars(conf);
+    Assert.assertEquals("file:///tmp/foo1.jar", conf.get(MR_JAR_PROPERTY));
+  }
+
+  @Test
+  public void testConfigureJarsStartingWithNull() throws Exception {
+    final JobConf conf = new JobConf();
+    conf.unset(MR_JAR_PROPERTY);
+    BaseWork baseWork = Mockito.mock(BaseWork.class);
+    Mockito.doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        conf.setStrings(MR_JAR_PROPERTY, "file:///tmp/foo1.jar", "file:///tmp/foo2.jar");
+        return null;
+      }
+
+    }).when(baseWork).configureJobConf(conf);
+
+    work.add(baseWork);
+    work.configureJobConfAndExtractJars(conf);
+    Assert.assertEquals("file:///tmp/foo1.jar,file:///tmp/foo2.jar", conf.get(MR_JAR_PROPERTY));
+  }
 }