You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2010/01/29 06:58:23 UTC

svn commit: r904388 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/Context.java

Author: nzhang
Date: Fri Jan 29 05:58:22 2010
New Revision: 904388

URL: http://svn.apache.org/viewvc?rev=904388&view=rev
Log:
HIVE-1109: Structured temporary directories (Zheng Shao via Ning Zhang)

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=904388&r1=904387&r2=904388&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Fri Jan 29 05:58:22 2010
@@ -37,6 +37,9 @@
     HIVE-1105. Add service script for starting metastore server.
     (John Sichi via zshao)
 
+    HIVE-1109. Structured temporary directories
+    (Zheng Shao via Ning Zhang)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -1547,3 +1550,4 @@
 
     HIVE-1087. Let user script write out binary data into a table
     (Zheng Shao via Ning Zhang)
+ 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=904388&r1=904387&r2=904388&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java Fri Jan 29 05:58:22 2010
@@ -23,7 +23,11 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
 import org.antlr.runtime.TokenRewriteStream;
@@ -48,22 +52,43 @@
   private Path[] resDirPaths;
   private int resDirFilesNum;
   boolean initialized;
+  
+  // Path without a file system
+  // hive.exec.scratchdir: default: "/tmp/"+System.getProperty("user.name")+"/hive"
+  // Used for creating temporary path on external file systems
   private String scratchPath;
-  private Path MRScratchDir;
+  // Path on the local file system
+  // System.getProperty("java.io.tmpdir") + Path.SEPARATOR
+  // + System.getProperty("user.name") + Path.SEPARATOR + executionId
   private Path localScratchDir;
-  private final ArrayList<Path> allScratchDirs = new ArrayList<Path>();
+  // On the default FileSystem (usually HDFS):
+  // also based on hive.exec.scratchdir which by default is
+  // "/tmp/"+System.getProperty("user.name")+"/hive"
+  private Path MRScratchDir;
+  
+  // allScratchDirs contains all scratch directories including
+  // localScratchDir and MRScratchDir.
+  // The external scratch dirs will be also based on hive.exec.scratchdir.
+  private final Map<String,Path> externalScratchDirs = new HashMap<String,Path>();
+  
   private HiveConf conf;
-  Random rand = new Random();
-  protected int randomid = Math.abs(rand.nextInt());
   protected int pathid = 10000;
   protected boolean explain = false;
   private TokenRewriteStream tokenRewriteStream;
 
-  public Context() {
+  String executionId;
+
+  public Context(HiveConf conf) throws IOException {
+    this(conf, generateExecutionId());
   }
 
-  public Context(HiveConf conf) {
+  /**
+   * Create a Context with a given executionId.  ExecutionId, together with
+   * user name and conf, will determine the temporary directory locations. 
+   */
+  public Context(HiveConf conf, String executionId) throws IOException {
     this.conf = conf;
+    this.executionId = executionId;
     Path tmpPath = new Path(conf.getVar(HiveConf.ConfVars.SCRATCHDIR));
     scratchPath = tmpPath.toUri().getPath();
   }
@@ -88,67 +113,65 @@
   }
 
   /**
-   * Make a tmp directory on the local filesystem
-   */
-  private void makeLocalScratchDir() throws IOException {
-    while (true) {
-      localScratchDir = new Path(System.getProperty("java.io.tmpdir")
-          + File.separator + Math.abs(rand.nextInt()));
-      FileSystem fs = FileSystem.getLocal(conf);
-      if (fs.mkdirs(localScratchDir)) {
-        localScratchDir = fs.makeQualified(localScratchDir);
-        allScratchDirs.add(localScratchDir);
-        break;
-      }
-    }
-  }
-
-  /**
    * Make a tmp directory for MR intermediate data If URI/Scheme are not
    * supplied - those implied by the default filesystem will be used (which will
    * typically correspond to hdfs instance on hadoop cluster)
+   * 
+   * @param mkdir  if true, will make the directory. Will throw IOException if that fails.
    */
-  private void makeMRScratchDir() throws IOException {
-    while (true) {
-      MRScratchDir = FileUtils.makeQualified(new Path(conf
-          .getVar(HiveConf.ConfVars.SCRATCHDIR), Integer.toString(Math.abs(rand
-          .nextInt()))), conf);
-
-      if (explain) {
-        allScratchDirs.add(MRScratchDir);
-        return;
-      }
-
-      FileSystem fs = MRScratchDir.getFileSystem(conf);
-      if (fs.mkdirs(MRScratchDir)) {
-        allScratchDirs.add(MRScratchDir);
-        return;
+  private static Path makeMRScratchDir(HiveConf conf, String executionId, boolean mkdir)
+      throws IOException {
+    
+    Path dir = FileUtils.makeQualified(
+        new Path(conf.getVar(HiveConf.ConfVars.SCRATCHDIR), executionId), conf);
+    
+    if (mkdir) {
+      FileSystem fs = dir.getFileSystem(conf);
+      if (!fs.mkdirs(dir)) {
+        throw new IOException("Cannot make directory: " + dir);
       }
     }
+    return dir;
   }
 
   /**
    * Make a tmp directory on specified URI Currently will use the same path as
    * implied by SCRATCHDIR config variable
    */
-  private Path makeExternalScratchDir(URI extURI) throws IOException {
-    while (true) {
-      String extPath = scratchPath + File.separator
-          + Integer.toString(Math.abs(rand.nextInt()));
-      Path extScratchDir = new Path(extURI.getScheme(), extURI.getAuthority(),
-          extPath);
-
-      if (explain) {
-        allScratchDirs.add(extScratchDir);
-        return extScratchDir;
+  private static Path makeExternalScratchDir(HiveConf conf, String executionId,
+      boolean mkdir, URI extURI) throws IOException {
+    
+    Path dir = new Path(extURI.getScheme(), extURI.getAuthority(),
+        conf.getVar(HiveConf.ConfVars.SCRATCHDIR) + Path.SEPARATOR + executionId);
+    
+    if (mkdir) {
+      FileSystem fs = dir.getFileSystem(conf);
+      if (!fs.mkdirs(dir)) {
+        throw new IOException("Cannot make directory: " + dir);
       }
-
-      FileSystem fs = extScratchDir.getFileSystem(conf);
-      if (fs.mkdirs(extScratchDir)) {
-        allScratchDirs.add(extScratchDir);
-        return extScratchDir;
+    }
+    return dir;
+  }
+  
+  /**
+   * Make a tmp directory for local file system.
+   * 
+   * @param mkdir  if true, will make the directory. Will throw IOException if that fails.
+   */
+  private static Path makeLocalScratchDir(HiveConf conf, String executionId, boolean mkdir)
+      throws IOException {
+    
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path dir = fs.makeQualified(new Path(System.getProperty("java.io.tmpdir")
+        + Path.SEPARATOR + System.getProperty("user.name") + Path.SEPARATOR
+        + executionId));
+    
+    if (mkdir) {
+      if (!fs.mkdirs(dir)) {
+        throw new IOException("Cannot make directory: " + dir);
       }
     }
+    return dir;
   }
 
   /**
@@ -157,15 +180,13 @@
    */
   private String getExternalScratchDir(URI extURI) {
     try {
-      // first check if we already made a scratch dir on this URI
-      for (Path p : allScratchDirs) {
-        URI pURI = p.toUri();
-        if (strEquals(pURI.getScheme(), extURI.getScheme())
-            && strEquals(pURI.getAuthority(), extURI.getAuthority())) {
-          return p.toString();
-        }
+      String fileSystem = extURI.getScheme() + ":" + extURI.getAuthority();
+      Path dir = externalScratchDirs.get(fileSystem);
+      if (dir == null) {
+        dir = makeExternalScratchDir(conf, executionId, !explain, extURI);
+        externalScratchDirs.put(fileSystem, dir);
       }
-      return makeExternalScratchDir(extURI).toString();
+      return dir.toString();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -175,69 +196,71 @@
    * Create a map-reduce scratch directory on demand and return it
    */
   private String getMRScratchDir() {
-    if (MRScratchDir == null) {
-      try {
-        makeMRScratchDir();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      } catch (IllegalArgumentException e) {
-        throw new RuntimeException("Error while making MR scratch "
-            + "directory - check filesystem config (" + e.getCause() + ")", e);
+    try {
+      if (MRScratchDir == null) {
+        MRScratchDir = makeMRScratchDir(conf, executionId, !explain);
       }
+      return MRScratchDir.toString();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalArgumentException e) {
+      throw new RuntimeException("Error while making MR scratch "
+          + "directory - check filesystem config (" + e.getCause() + ")", e);
     }
-    return MRScratchDir.toString();
   }
 
   /**
    * Create a local scratch directory on demand and return it
    */
   private String getLocalScratchDir() {
-    if (localScratchDir == null) {
-      try {
-        makeLocalScratchDir();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      } catch (IllegalArgumentException e) {
-        throw new RuntimeException("Error while making local scratch "
-            + "directory - check filesystem config (" + e.getCause() + ")", e);
+    try {
+      if (localScratchDir == null) {
+        localScratchDir = makeLocalScratchDir(conf, executionId, true);
       }
+      return localScratchDir.toString();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalArgumentException e) {
+      throw new RuntimeException("Error while making local scratch "
+          + "directory - check filesystem config (" + e.getCause() + ")", e);
     }
-    return localScratchDir.toString();
   }
 
+  private void removeDir(Path p) {
+    try {
+      p.getFileSystem(conf).delete(p, true);
+    } catch (Exception e) {
+      LOG.warn("Error Removing Scratch: "
+          + StringUtils.stringifyException(e));
+    }
+  }
+  
   /**
    * Remove any created scratch directories
    */
   private void removeScratchDir() {
-    if (explain) {
-      try {
-        if (localScratchDir != null) {
-          FileSystem.getLocal(conf).delete(localScratchDir, true);
-        }
-      } catch (Exception e) {
-        LOG
-            .warn("Error Removing Scratch: "
-                + StringUtils.stringifyException(e));
-      }
-    } else {
-      for (Path p : allScratchDirs) {
-        try {
-          p.getFileSystem(conf).delete(p, true);
-        } catch (Exception e) {
-          LOG.warn("Error Removing Scratch: "
-              + StringUtils.stringifyException(e));
-        }
-      }
+    
+    for (Map.Entry<String,Path> p : externalScratchDirs.entrySet()) {
+      removeDir(p.getValue());
+    }
+    externalScratchDirs.clear();
+    
+    if (MRScratchDir != null) {
+      removeDir(MRScratchDir);
+      MRScratchDir = null;
+    }
+    
+    if (localScratchDir != null) {
+      removeDir(localScratchDir);
+      localScratchDir = null;
     }
-    MRScratchDir = null;
-    localScratchDir = null;
   }
 
   /**
    * Return the next available path in the current scratch dir
    */
   private String nextPath(String base) {
-    return base + File.separator + Integer.toString(pathid++);
+    return base + Path.SEPARATOR + Integer.toString(pathid++);
   }
 
   /**
@@ -425,4 +448,20 @@
   public TokenRewriteStream getTokenRewriteStream() {
     return tokenRewriteStream;
   }
+  
+  /**
+   * Generate a unique executionId.  An executionId, together with user name and
+   * the configuration, will determine the temporary locations of all intermediate
+   * files.
+   * 
+   * In the future, users can use the executionId to resume a query.
+   */
+  public static String generateExecutionId() {
+    Random rand = new Random();
+    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS");
+    String executionId = "hive_" + format.format(new Date()) + "_"
+        + Math.abs(rand.nextLong());
+    return executionId;
+  }
+  
 }