You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/03/28 00:13:24 UTC
svn commit: r1582536 - in
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez: DagUtils.java
TezSessionState.java
Author: sershe
Date: Thu Mar 27 23:13:23 2014
New Revision: 1582536
URL: http://svn.apache.org/r1582536
Log:
HIVE-6703 Tez should store SHA of the jar when uploading to cache (Sergey Shelukhin, reviewed by Gunther Hagleitner)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1582536&r1=1582535&r2=1582536&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Thu Mar 27 23:13:23 2014
@@ -657,6 +657,7 @@ public class DagUtils {
* @return the name of the resource from a given path string.
*/
public String getResourceBaseName(String pathStr) {
+ // TODO: this should probably use Path::getName
String[] splits = pathStr.split("/");
return splits[splits.length - 1];
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1582536&r1=1582535&r2=1582536&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Thu Mar 27 23:13:23 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.t
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
@@ -29,6 +30,8 @@ import java.util.UUID;
import javax.security.auth.login.LoginException;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
@@ -127,7 +130,7 @@ public class TezSessionState {
// unless already installed on all the cluster nodes, we'll have to
// localize hive-exec.jar as well.
- appJarLr = createHiveExecLocalResource();
+ appJarLr = createJarLocalResource(utils.getExecJarPathLocal());
// configuration for the application master
Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
@@ -238,80 +241,107 @@ public class TezSessionState {
}
/**
- * Returns a local resource representing the hive-exec jar. This resource will
- * be used to execute the plan on the cluster.
+ * Returns a local resource representing a jar.
+ * This resource will be used to execute the plan on the cluster.
+ * @param localJarPath Local path to the jar to be localized.
* @return LocalResource corresponding to the localized hive exec resource.
* @throws IOException when any file system related call fails.
* @throws LoginException when we are unable to determine the user.
* @throws URISyntaxException when current jar location cannot be determined.
*/
- private LocalResource createHiveExecLocalResource()
- throws IOException, LoginException, URISyntaxException {
- String hiveJarDir = utils.getHiveJarDirectory(conf);
- String currentVersionPathStr = utils.getExecJarPathLocal();
- String currentJarName = utils.getResourceBaseName(currentVersionPathStr);
- FileSystem fs = null;
- Path jarPath = null;
- FileStatus dirStatus = null;
+ private LocalResource createJarLocalResource(String localJarPath)
+ throws IOException, LoginException, IllegalArgumentException,
+ FileNotFoundException {
+ Path destDirPath = null;
+ FileSystem destFs = null;
+ FileStatus destDirStatus = null;
+
+ {
+ String hiveJarDir = utils.getHiveJarDirectory(conf);
+ if (hiveJarDir != null) {
+ LOG.info("Hive jar directory is " + hiveJarDir);
+ // check if it is a valid directory in HDFS
+ destDirPath = new Path(hiveJarDir);
+ destFs = destDirPath.getFileSystem(conf);
+ destDirStatus = validateTargetDir(destDirPath, destFs);
+ }
+ }
- if (hiveJarDir != null) {
- LOG.info("Hive jar directory is " + hiveJarDir);
- // check if it is a valid directory in HDFS
- Path hiveJarDirPath = new Path(hiveJarDir);
- fs = hiveJarDirPath.getFileSystem(conf);
+ /*
+ * Specified location does not exist or is not a directory.
+ * Try to push the jar to the hdfs location pointed by config variable HIVE_INSTALL_DIR.
+ * Path will be HIVE_INSTALL_DIR/{username}/.hiveJars/
+ * This will probably never ever happen.
+ */
+ if (destDirStatus == null || !destDirStatus.isDir()) {
+ destDirPath = utils.getDefaultDestDir(conf);
+ LOG.info("Jar dir is null/directory doesn't exist. Choosing HIVE_INSTALL_DIR - "
+ + destDirPath);
+ destFs = destDirPath.getFileSystem(conf);
+ destDirStatus = validateTargetDir(destDirPath, destFs);
+ }
- if (!(fs instanceof DistributedFileSystem)) {
- throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hiveJarDir));
- }
+ // we couldn't find any valid locations. Throw exception
+ if (destDirStatus == null || !destDirStatus.isDir()) {
+ throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg());
+ }
- try {
- dirStatus = fs.getFileStatus(hiveJarDirPath);
- } catch (FileNotFoundException fe) {
- // do nothing
- }
- if ((dirStatus != null) && (dirStatus.isDir())) {
- FileStatus[] listFileStatus = fs.listStatus(hiveJarDirPath);
- for (FileStatus fstatus : listFileStatus) {
- String jarName = utils.getResourceBaseName(fstatus.getPath().toString());
- if (jarName.equals(currentJarName)) {
- // we have found the jar we need.
- jarPath = fstatus.getPath();
- return utils.localizeResource(null, jarPath, conf);
- }
- }
+ Path localFile = new Path(localJarPath);
+ String sha = getSha(localFile);
+ String destFileName = localFile.getName();
+
+ // Now, try to find the file based on SHA and name. Currently we require exact name match.
+ // We could also allow cutting off versions and other stuff provided that SHA matches...
+ destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha
+ + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName);
- // jar wasn't in the directory, copy the one in current use
- if (jarPath == null) {
- Path dest = new Path(hiveJarDir + "/" + currentJarName);
- return utils.localizeResource(new Path(currentVersionPathStr), dest, conf);
- }
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The destination file name for [" + localJarPath + "] is " + destFileName);
}
- /*
- * specified location does not exist or is not a directory
- * try to push the jar to the hdfs location pointed by
- * config variable HIVE_INSTALL_DIR. Path will be
- * HIVE_INSTALL_DIR/{username}/.hiveJars/
- */
- if ((hiveJarDir == null) || (dirStatus == null) ||
- ((dirStatus != null) && (!dirStatus.isDir()))) {
- Path dest = utils.getDefaultDestDir(conf);
- LOG.info("Jar dir is null/directory doesn't exist. Choosing HIVE_INSTALL_DIR - " + dest);
- String destPathStr = dest.toString();
- String jarPathStr = destPathStr + "/" + currentJarName;
- dirStatus = fs.getFileStatus(dest);
- if (dirStatus.isDir()) {
- return utils.localizeResource(new Path(currentVersionPathStr), new Path(jarPathStr), conf);
- } else {
- throw new IOException(ErrorMsg.INVALID_DIR.format(dest.toString()));
+ // TODO: if this method is ever called on more than one jar, getting the dir and the
+ // list need to be refactored out to be done only once.
+ Path jarPath = null;
+ FileStatus[] listFileStatus = destFs.listStatus(destDirPath);
+ for (FileStatus fstatus : listFileStatus) {
+ String jarName = utils.getResourceBaseName(fstatus.getPath().toString()); // ...
+ if (jarName.equals(destFileName)) {
+ // We have found the jar we need.
+ jarPath = fstatus.getPath();
+ return utils.localizeResource(null, jarPath, conf);
}
- }
+ }
- // we couldn't find any valid locations. Throw exception
- throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg());
+ // Jar wasn't in the directory, copy the one in current use.
+ assert jarPath == null;
+ Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
+ return utils.localizeResource(localFile, destFile, conf);
+ }
+
+ private FileStatus validateTargetDir(Path hiveJarDirPath, FileSystem fs) throws IOException {
+ if (!(fs instanceof DistributedFileSystem)) {
+ throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(hiveJarDirPath.toString()));
+ }
+ try {
+ return fs.getFileStatus(hiveJarDirPath);
+ } catch (FileNotFoundException fe) {
+ // do nothing
+ }
+ return null;
}
+ private String getSha(Path localFile) throws IOException, IllegalArgumentException {
+ InputStream is = null;
+ try {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ is = localFs.open(localFile);
+ return DigestUtils.sha256Hex(is);
+ } finally {
+ if (is != null) {
+ is.close();
+ }
+ }
+ }
public void setQueueName(String queueName) {
this.queueName = queueName;
}