You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/03/28 00:13:24 UTC

svn commit: r1582536 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez: DagUtils.java TezSessionState.java

Author: sershe
Date: Thu Mar 27 23:13:23 2014
New Revision: 1582536

URL: http://svn.apache.org/r1582536
Log:
HIVE-6703 Tez should store SHA of the jar when uploading to cache (Sergey Shelukhin, reviewed by Gunther Hagleitner)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1582536&r1=1582535&r2=1582536&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Thu Mar 27 23:13:23 2014
@@ -657,6 +657,7 @@ public class DagUtils {
    * @return the name of the resource from a given path string.
    */
   public String getResourceBaseName(String pathStr) {
+    // TODO: this should probably use Path::getName
     String[] splits = pathStr.split("/");
     return splits[splits.length - 1];
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1582536&r1=1582535&r2=1582536&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Thu Mar 27 23:13:23 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.t
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -29,6 +30,8 @@ import java.util.UUID;
 
 import javax.security.auth.login.LoginException;
 
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
@@ -127,7 +130,7 @@ public class TezSessionState {
 
     // unless already installed on all the cluster nodes, we'll have to
     // localize hive-exec.jar as well.
-    appJarLr = createHiveExecLocalResource();
+    appJarLr = createJarLocalResource(utils.getExecJarPathLocal());
 
     // configuration for the application master
     Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
@@ -238,80 +241,107 @@ public class TezSessionState {
   }
 
   /**
-   * Returns a local resource representing the hive-exec jar. This resource will
-   * be used to execute the plan on the cluster.
+   * Returns a local resource representing a jar.
+   * This resource will be used to execute the plan on the cluster.
+   * @param localJarPath Local path to the jar to be localized.
    * @return LocalResource corresponding to the localized hive exec resource.
    * @throws IOException when any file system related call fails.
    * @throws LoginException when we are unable to determine the user.
    * @throws URISyntaxException when current jar location cannot be determined.
    */
-  private LocalResource createHiveExecLocalResource()
-    throws IOException, LoginException, URISyntaxException {
-    String hiveJarDir = utils.getHiveJarDirectory(conf);
-    String currentVersionPathStr = utils.getExecJarPathLocal();
-    String currentJarName = utils.getResourceBaseName(currentVersionPathStr);
-    FileSystem fs = null;
-    Path jarPath = null;
-    FileStatus dirStatus = null;
+  private LocalResource createJarLocalResource(String localJarPath)
+      throws IOException, LoginException, IllegalArgumentException,
+      FileNotFoundException {
+    Path destDirPath = null;
+    FileSystem destFs = null;
+    FileStatus destDirStatus = null;
+
+    {
+      String hiveJarDir = utils.getHiveJarDirectory(conf);
+      if (hiveJarDir != null) {
+        LOG.info("Hive jar directory is " + hiveJarDir);
+        // check if it is a valid directory in HDFS
+        destDirPath = new Path(hiveJarDir);
+        destFs = destDirPath.getFileSystem(conf);
+        destDirStatus = validateTargetDir(destDirPath, destFs);
+      }
+    }
 
-    if (hiveJarDir != null) {
-      LOG.info("Hive jar directory is " + hiveJarDir);
-      // check if it is a valid directory in HDFS
-      Path hiveJarDirPath = new Path(hiveJarDir);
-      fs = hiveJarDirPath.getFileSystem(conf);
+    /*
+     * Specified location does not exist or is not a directory.
+     * Try to push the jar to the hdfs location pointed by config variable HIVE_INSTALL_DIR.
+     * Path will be HIVE_INSTALL_DIR/{username}/.hiveJars/
+     * This will probably never ever happen.
+     */
+    if (destDirStatus == null || !destDirStatus.isDir()) {
+      destDirPath = utils.getDefaultDestDir(conf);
+      LOG.info("Jar dir is null/directory doesn't exist. Choosing HIVE_INSTALL_DIR - "
+          + destDirPath);
+      destFs = destDirPath.getFileSystem(conf);
+      destDirStatus = validateTargetDir(destDirPath, destFs);
+    }
 
-      if (!(fs instanceof DistributedFileSystem)) {
-        throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hiveJarDir));
-      }
+    // we couldn't find any valid locations. Throw exception
+    if (destDirStatus == null || !destDirStatus.isDir()) {
+      throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg());
+    }
 
-      try {
-        dirStatus = fs.getFileStatus(hiveJarDirPath);
-      } catch (FileNotFoundException fe) {
-        // do nothing
-      }
-      if ((dirStatus != null) && (dirStatus.isDir())) {
-        FileStatus[] listFileStatus = fs.listStatus(hiveJarDirPath);
-        for (FileStatus fstatus : listFileStatus) {
-          String jarName = utils.getResourceBaseName(fstatus.getPath().toString());
-          if (jarName.equals(currentJarName)) {
-            // we have found the jar we need.
-            jarPath = fstatus.getPath();
-            return utils.localizeResource(null, jarPath, conf);
-          }
-        }
+    Path localFile = new Path(localJarPath);
+    String sha = getSha(localFile);
+    String destFileName = localFile.getName();
+
+    // Now, try to find the file based on SHA and name. Currently we require exact name match.
+    // We could also allow cutting off versions and other stuff provided that SHA matches...
+    destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha
+        + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName);
 
-        // jar wasn't in the directory, copy the one in current use
-        if (jarPath == null) {
-          Path dest = new Path(hiveJarDir + "/" + currentJarName);
-          return utils.localizeResource(new Path(currentVersionPathStr), dest, conf);
-        }
-      }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("The destination file name for [" + localJarPath + "] is " + destFileName);
     }
 
-    /*
-     * specified location does not exist or is not a directory
-     * try to push the jar to the hdfs location pointed by
-     * config variable HIVE_INSTALL_DIR. Path will be
-     * HIVE_INSTALL_DIR/{username}/.hiveJars/
-     */
-    if ((hiveJarDir == null) || (dirStatus == null) ||
-        ((dirStatus != null) && (!dirStatus.isDir()))) {
-      Path dest = utils.getDefaultDestDir(conf);
-      LOG.info("Jar dir is null/directory doesn't exist. Choosing HIVE_INSTALL_DIR - " + dest);
-      String destPathStr = dest.toString();
-      String jarPathStr = destPathStr + "/" + currentJarName;
-      dirStatus = fs.getFileStatus(dest);
-      if (dirStatus.isDir()) {
-        return utils.localizeResource(new Path(currentVersionPathStr), new Path(jarPathStr), conf);
-      } else {
-        throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString()));
+    // TODO: if this method is ever called on more than one jar, getting the dir and the
+    //       list need to be refactored out to be done only once.
+    Path jarPath = null;
+    FileStatus[] listFileStatus = destFs.listStatus(destDirPath);
+    for (FileStatus fstatus : listFileStatus) {
+      String jarName = utils.getResourceBaseName(fstatus.getPath().toString()); // ...
+      if (jarName.equals(destFileName)) {
+        // We have found the jar we need.
+        jarPath = fstatus.getPath();
+        return utils.localizeResource(null, jarPath, conf);
       }
-        }
+    }
 
-    // we couldn't find any valid locations. Throw exception
-    throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg());
+    // Jar wasn't in the directory, copy the one in current use.
+    assert jarPath == null;
+    Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
+    return utils.localizeResource(localFile, destFile, conf);
+  }
+
+  private FileStatus validateTargetDir(Path hiveJarDirPath, FileSystem fs) throws IOException {
+    if (!(fs instanceof DistributedFileSystem)) {
+      throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hiveJarDirPath.toString()));
+    }
+    try {
+      return fs.getFileStatus(hiveJarDirPath);
+    } catch (FileNotFoundException fe) {
+      // do nothing
+    }
+    return null;
   }
 
+  private String getSha(Path localFile) throws IOException, IllegalArgumentException {
+    InputStream is = null;
+    try {
+      FileSystem localFs = FileSystem.getLocal(conf);
+      is = localFs.open(localFile);
+      return DigestUtils.sha256Hex(is);
+    } finally {
+      if (is != null) {
+        is.close();
+      }
+    }
+  }
   public void setQueueName(String queueName) {
     this.queueName = queueName;
   }