You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/04/26 00:11:36 UTC

svn commit: r1590164 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: exec/tez/DagUtils.java exec/tez/TezSessionPoolManager.java exec/tez/TezTask.java session/SessionState.java

Author: sershe
Date: Fri Apr 25 22:11:35 2014
New Revision: 1590164

URL: http://svn.apache.org/r1590164
Log:
HIVE-6898 : Functions in hive are failing with java.lang.ClassNotFoundException on Tez (Vikram Dixit K, reviewed by Sergey Shelukhin)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1590164&r1=1590163&r2=1590164&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Apr 25 22:11:35 2014
@@ -17,10 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
@@ -32,7 +28,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 
 import javax.security.auth.login.LoginException;
@@ -45,7 +40,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -85,6 +79,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.client.PreWarmContext;
+import org.apache.tez.client.TezSessionConfiguration;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
@@ -96,13 +92,11 @@ import org.apache.tez.dag.api.GroupInput
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
-import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -118,6 +112,10 @@ import org.apache.tez.runtime.library.ou
 import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
 import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
 /**
  * DagUtils. DagUtils is a collection of helper methods to convert
  * map and reduce work to tez vertices and edges. It handles configuration
@@ -254,7 +252,7 @@ public class DagUtils {
   /**
    * Given two vertices a, b update their configurations to be used in an Edge a-b
    */
-  public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) 
+  public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w)
     throws IOException {
 
     // Tez needs to setup output subsequent input pairs correctly
@@ -311,13 +309,13 @@ public class DagUtils {
         break;
 
       case CUSTOM_EDGE:
-        
+
         dataMovementType = DataMovementType.CUSTOM;
         logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
         logicalInputClass = ShuffledUnorderedKVInput.class;
         EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
             CustomPartitionEdge.class.getName());
-        CustomEdgeConfiguration edgeConf = 
+        CustomEdgeConfiguration edgeConf =
             new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
           DataOutputBuffer dob = new DataOutputBuffer();
           edgeConf.write(dob);
@@ -432,7 +430,7 @@ public class DagUtils {
     }
     if (vertexHasCustomInput) {
       useTezGroupedSplits = false;
-      // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat 
+      // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
       // here would cause pre-mature grouping which would be incorrect.
       inputFormatClass = HiveInputFormat.class;
       conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
@@ -661,6 +659,11 @@ public class DagUtils {
       String hdfsDirPathStr, Configuration conf) throws IOException, LoginException {
     List<LocalResource> tmpResources = new ArrayList<LocalResource>();
 
+    addTempFiles(conf, tmpResources, hdfsDirPathStr, getTempFilesFromConf(conf));
+    return tmpResources;
+  }
+
+  public static String[] getTempFilesFromConf(Configuration conf) {
     String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
     if (StringUtils.isNotBlank(addedFiles)) {
       HiveConf.setVar(conf, ConfVars.HIVEADDEDFILES, addedFiles);
@@ -679,8 +682,7 @@ public class DagUtils {
     // need to localize the additional jars and files
     // we need the directory on hdfs to which we shall put all these files
     String allFiles = auxJars + "," + addedJars + "," + addedFiles + "," + addedArchives;
-    addTempFiles(conf, tmpResources, hdfsDirPathStr, allFiles.split(","));
-    return tmpResources;
+    return allFiles.split(",");
   }
 
   /**
@@ -900,7 +902,7 @@ public class DagUtils {
    * @param work The instance of BaseWork representing the actual work to be performed
    * by this vertex.
    * @param scratchDir HDFS scratch dir for this execution unit.
-   * @param list 
+   * @param list
    * @param appJarLr Local resource for hive-exec.
    * @param additionalLr
    * @param fileSystem FS corresponding to scratchDir and LocalResources
@@ -908,7 +910,7 @@ public class DagUtils {
    * @return Vertex
    */
   public Vertex createVertex(JobConf conf, BaseWork work,
-      Path scratchDir, LocalResource appJarLr, 
+      Path scratchDir, LocalResource appJarLr,
       List<LocalResource> additionalLr,
       FileSystem fileSystem, Context ctx, boolean hasChildren, TezWork tezWork) throws Exception {
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1590164&r1=1590163&r2=1590164&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Fri Apr 25 22:11:35 2014
@@ -21,8 +21,6 @@ package org.apache.hadoop.hive.ql.exec.t
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
-import javax.security.auth.login.LoginException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -106,7 +104,8 @@ public class TezSessionPoolManager {
     }
   }
 
-  private TezSessionState getSession(HiveConf conf, boolean doOpen)
+  private TezSessionState getSession(HiveConf conf, boolean doOpen,
+      boolean forceCreate)
       throws Exception {
 
     String queueName = conf.get("tez.queue.name");
@@ -119,8 +118,9 @@ public class TezSessionPoolManager {
      * their own credentials. We expect that with the new security model, things will
      * run as user hive in most cases.
      */
-    if (!(this.inited) || ((queueName != null) && (!queueName.isEmpty()))
-       || (nonDefaultUser) || (defaultQueuePool == null) || (blockingQueueLength <= 0)) {
+    if (forceCreate || !(this.inited)
+        || ((queueName != null) && (!queueName.isEmpty()))
+        || (nonDefaultUser) || (defaultQueuePool == null) || (blockingQueueLength <= 0)) {
       LOG.info("QueueName: " + queueName + " nonDefaultUser: " + nonDefaultUser +
           " defaultQueuePool: " + defaultQueuePool +
           " blockingQueueLength: " + blockingQueueLength);
@@ -156,7 +156,7 @@ public class TezSessionPoolManager {
   public void returnSession(TezSessionState tezSessionState)
       throws Exception {
     if (tezSessionState.isDefault()) {
-      LOG.info("The session " + tezSessionState.getSessionId() 
+      LOG.info("The session " + tezSessionState.getSessionId()
           + " belongs to the pool. Put it back in");
       SessionState sessionState = SessionState.get();
       if (sessionState != null) {
@@ -194,15 +194,7 @@ public class TezSessionPoolManager {
 
   public TezSessionState getSession(
       TezSessionState session, HiveConf conf, boolean doOpen) throws Exception {
-    if (canWorkWithSameSession(session, conf)) {
-      return session;
-    }
-
-    if (session != null) {
-      session.close(false);
-    }
-
-    return getSession(conf, doOpen);
+    return getSession(session, conf, doOpen, false);
   }
 
   /*
@@ -262,4 +254,17 @@ public class TezSessionPoolManager {
 
     return true;
   }
+
+  public TezSessionState getSession(TezSessionState session, HiveConf conf,
+      boolean doOpen, boolean forceCreate) throws Exception {
+    if (canWorkWithSameSession(session, conf)) {
+      return session;
+    }
+
+    if (session != null) {
+      session.close(false);
+    }
+
+    return getSession(conf, doOpen, forceCreate);
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1590164&r1=1590163&r2=1590164&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Fri Apr 25 22:11:35 2014
@@ -23,7 +23,6 @@ import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -34,7 +33,6 @@ import javax.security.auth.login.LoginEx
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -52,7 +50,6 @@ import org.apache.hadoop.hive.ql.session
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.TezSession;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
@@ -80,7 +77,7 @@ public class TezTask extends Task<TezWor
 
   private TezCounters counters;
 
-  private DagUtils utils;
+  private final DagUtils utils;
 
   public TezTask() {
     this(DagUtils.getInstance());
@@ -129,12 +126,22 @@ public class TezTask extends Task<TezWor
 
       // create the tez tmp dir
       scratchDir = utils.createTezDir(scratchDir, conf);
+      boolean hasResources = session.hasResources(inputOutputJars);
+
+      if (ss.hasAddedResource()) {
+        // need to re-launch session because of new added jars.
+        hasResources = false;
+        // reset the added resource flag for this session since we would
+        // relocalize (either by restarting or relocalizing) due to the above
+        // hasResources flag.
+        ss.setAddedResource(false);
+      }
 
       // If we have any jars from input format, we need to restart the session because
       // AM will need them; so, AM has to be restarted. What a mess...
-      if (!session.hasResources(inputOutputJars) && session.isOpen()) {
+      if (!hasResources && session.isOpen()) {
         LOG.info("Tez session being reopened to pass custom jars to AM");
-        session.close(false);
+        TezSessionPoolManager.getInstance().close(session);
         session = TezSessionPoolManager.getInstance().getSession(null, conf, false);
         ss.setTezSession(session);
       }
@@ -246,15 +253,15 @@ public class TezTask extends Task<TezWor
           vertexArray[i++] = workToVertex.get(v);
         }
         VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray);
-        
+
         // now hook up the children
         for (BaseWork v: children) {
           // need to pairwise patch up the configuration of the vertices
           for (BaseWork part: unionWorkItems) {
-            utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part), 
+            utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part),
                  workToConf.get(v), workToVertex.get(v));
           }
-          
+
           // finally we can create the grouped edge
           GroupInputEdge e = utils.createEdge(group, workToConf.get(v),
                workToVertex.get(v), work.getEdgeProperty(w, v));
@@ -264,14 +271,14 @@ public class TezTask extends Task<TezWor
       } else {
         // Regular vertices
         JobConf wxConf = utils.initializeVertexConf(conf, w);
-        Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr, 
+        Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr,
           additionalLr, fs, ctx, !isFinal, work);
         dag.addVertex(wx);
         utils.addCredentials(w, dag);
         perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
         workToVertex.put(w, wx);
         workToConf.put(w, wxConf);
-        
+
         // add all dependencies (i.e.: edges) to the graph
         for (BaseWork v: work.getChildren(w)) {
           assert workToVertex.containsKey(v);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1590164&r1=1590163&r2=1590164&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Fri Apr 25 22:11:35 2014
@@ -163,6 +163,8 @@ public class SessionState {
   private final String CONFIG_AUTHZ_SETTINGS_APPLIED_MARKER =
       "hive.internal.ss.authz.settings.applied.marker";
 
+  private boolean addedResource;
+
   /**
    * Lineage state.
    */
@@ -736,6 +738,7 @@ public class SessionState {
     getConsole().printInfo("Added resource: " + fnlVal);
     resourceMap.add(fnlVal);
 
+    addedResource = true;
     return fnlVal;
   }
 
@@ -1033,4 +1036,11 @@ public class SessionState {
 
   }
 
+  public boolean hasAddedResource() {
+    return addedResource;
+  }
+
+  public void setAddedResource(boolean addedResouce) {
+    this.addedResource = addedResouce;
+  }
 }