You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/03/30 04:02:26 UTC

hive git commit: HIVE-13303 : spill to YARN directories, not tmp, when available (Sergey Shelukhin, reviewed by Gopal V)

Repository: hive
Updated Branches:
  refs/heads/master 56b645981 -> 20a8192a2


HIVE-13303 : spill to YARN directories, not tmp, when available (Sergey Shelukhin, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/20a8192a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/20a8192a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/20a8192a

Branch: refs/heads/master
Commit: 20a8192a2b8f36da5ef2d5d61d77de1e70188b1d
Parents: 56b6459
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Mar 29 18:57:27 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Mar 29 18:57:27 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    | 54 ++++++++++++++++++++
 .../hadoop/hive/llap/io/api/LlapProxy.java      |  2 +
 .../org/apache/hadoop/hive/llap/LlapUtil.java   | 26 ++++++++++
 .../hive/llap/daemon/impl/LlapDaemon.java       |  6 +--
 .../persistence/HybridHashTableContainer.java   | 40 ++++++++++-----
 .../ql/exec/persistence/KeyValueContainer.java  | 25 +++++----
 .../ql/exec/persistence/ObjectContainer.java    | 24 ++++-----
 .../hive/ql/exec/persistence/RowContainer.java  | 34 ++++++------
 .../hadoop/hive/ql/exec/tez/DagUtils.java       |  1 +
 .../mapjoin/VectorMapJoinRowBytesContainer.java | 24 ++++-----
 .../ql/exec/persistence/TestHashPartition.java  |  3 +-
 .../TestVectorMapJoinRowBytesContainer.java     |  3 +-
 12 files changed, 169 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/20a8192a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 8c9bd3d..51340d8 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -27,6 +27,7 @@ import java.security.AccessControlException;
 import java.security.PrivilegedExceptionAction;
 import java.util.BitSet;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,6 +55,7 @@ import org.slf4j.LoggerFactory;
  */
 public final class FileUtils {
   private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class.getName());
+  private static final Random random = new Random();
 
   public static final PathFilter HIDDEN_FILES_PATH_FILTER = new PathFilter() {
     @Override
@@ -827,6 +830,57 @@ public final class FileUtils {
     return tmpFile;
   }
 
+  public static File createLocalDirsTempFile(String localDirList, String prefix, String suffix,
+      boolean isDirectory) throws IOException {
+    if (localDirList == null || localDirList.isEmpty()) {
+      return createFileInTmp(prefix, suffix, "Local directories not specified", isDirectory);
+    }
+    String[] localDirs = StringUtils.getTrimmedStrings(localDirList);
+    if (localDirs.length == 0) {
+      return createFileInTmp(prefix, suffix, "Local directories not specified", isDirectory);
+    }
+    // TODO: we could stagger these to threads by ID, but that can also lead to bad effects.
+    String path = localDirs[random.nextInt(localDirs.length)];
+    if (path == null || path.isEmpty()) {
+      return createFileInTmp(prefix, suffix, "Empty path for one of the local dirs", isDirectory);
+    }
+    File targetDir = new File(path);
+    if (!targetDir.exists() && !targetDir.mkdirs()) {
+      return createFileInTmp(prefix, suffix, "Cannot access or create " + targetDir, isDirectory);
+    }
+    try {
+      File file = File.createTempFile(prefix, suffix, targetDir);
+      if (isDirectory && (!file.delete() || !file.mkdirs())) {
+        // TODO: or we could just generate a name ourselves and not do this?
+        return createFileInTmp(prefix, suffix,
+            "Cannot recreate " + file + " as directory", isDirectory);
+      }
+      file.deleteOnExit();
+      return file;
+    } catch (IOException ex) {
+      LOG.error("Error creating a file in " + targetDir, ex);
+      return createFileInTmp(prefix, suffix, "Cannot create a file in " + targetDir, isDirectory);
+    }
+  }
+
+  private static File createFileInTmp(String prefix, String suffix,
+      String reason, boolean isDirectory) throws IOException {
+    File file = File.createTempFile(prefix, suffix);
+    if (isDirectory && (!file.delete() || !file.mkdirs())) {
+      // TODO: or we could just generate a name ourselves and not do this?
+      throw new IOException("Cannot recreate " + file + " as directory");
+    }
+    file.deleteOnExit();
+    LOG.info(reason + "; created a tmp file: " + file.getAbsolutePath());
+    return file;
+  }
+
+  public static File createLocalDirsTempFile(Configuration conf, String prefix, String suffix,
+      boolean isDirectory) throws IOException {
+    return createLocalDirsTempFile(
+        conf.get("yarn.nodemanager.local-dirs"), prefix, suffix, isDirectory);
+  }
+
   /**
    * delete a temporary file and remove it from delete-on-exit hook.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/20a8192a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
index 6359bab..424769f 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapProxy.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.security.LlapTokenProvider;
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/20a8192a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
new file mode 100644
index 0000000..ce03de0
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+
+public class LlapUtil {
+  public static String getDaemonLocalDirList(Configuration conf) {
+    String localDirList = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_WORK_DIRS);
+    if (localDirList != null && !localDirList.isEmpty()) return localDirList;
+    return conf.get("yarn.nodemanager.local-dirs");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/20a8192a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 22d7eec..c8734a5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
@@ -333,10 +334,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
       LlapDaemonConfiguration daemonConf = new LlapDaemonConfiguration();
       int numExecutors = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
 
-      String localDirList = HiveConf.getVar(daemonConf, ConfVars.LLAP_DAEMON_WORK_DIRS);
-      if (localDirList == null || localDirList.isEmpty()) {
-        localDirList = daemonConf.get("yarn.nodemanager.local-dirs");
-      }
+      String localDirList = LlapUtil.getDaemonLocalDirList(daemonConf);
       String[] localDirs = (localDirList == null || localDirList.isEmpty()) ?
           new String[0] : StringUtils.getTrimmedStrings(localDirList);
       int rpcPort = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_PORT);

http://git-wip-us.apache.org/repos/asf/hive/blob/20a8192a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index f6471db..f5da5a4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hive.ql.exec.persistence;
 
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectOutputStream;
@@ -30,6 +32,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -110,6 +113,8 @@ public class HybridHashTableContainer
 
   private final List<Object> EMPTY_LIST = new ArrayList<Object>(0);
 
+  private final String spillLocalDirs;
+
   /**
    * This class encapsulates the triplet together since they are closely related to each other
    * The triplet: hashmap (either in memory or on disk), small table container, big table container
@@ -127,12 +132,13 @@ public class HybridHashTableContainer
     float loadFactor;                       // Same as above
     int wbSize;                             // Same as above
     int rowsOnDisk;                         // How many rows saved to the on-disk hashmap (if on disk)
+    private final String spillLocalDirs;
 
     /* It may happen that there's not enough memory to instantiate a hashmap for the partition.
      * In that case, we don't create the hashmap, but pretend the hashmap is directly "spilled".
      */
     public HashPartition(int initialCapacity, float loadFactor, int wbSize, long maxProbeSize,
-                         boolean createHashMap) {
+                         boolean createHashMap, String spillLocalDirs) {
       if (createHashMap) {
         // Probe space should be at least equal to the size of our designated wbSize
         maxProbeSize = Math.max(maxProbeSize, wbSize);
@@ -141,6 +147,7 @@ public class HybridHashTableContainer
         hashMapSpilledOnCreation = true;
         hashMapOnDisk = true;
       }
+      this.spillLocalDirs = spillLocalDirs;
       this.initialCapacity = initialCapacity;
       this.loadFactor = loadFactor;
       this.wbSize = wbSize;
@@ -187,7 +194,7 @@ public class HybridHashTableContainer
     /* Get the small table key/value container */
     public KeyValueContainer getSidefileKVContainer() {
       if (sidefileKVContainer == null) {
-        sidefileKVContainer = new KeyValueContainer();
+        sidefileKVContainer = new KeyValueContainer(spillLocalDirs);
       }
       return sidefileKVContainer;
     }
@@ -195,7 +202,7 @@ public class HybridHashTableContainer
     /* Get the big table row container */
     public ObjectContainer getMatchfileObjContainer() {
       if (matchfileObjContainer == null) {
-        matchfileObjContainer = new ObjectContainer();
+        matchfileObjContainer = new ObjectContainer(spillLocalDirs);
       }
       return matchfileObjContainer;
     }
@@ -203,7 +210,7 @@ public class HybridHashTableContainer
     /* Get the big table row bytes container for native vector map join */
     public VectorMapJoinRowBytesContainer getMatchfileRowBytesContainer() {
       if (matchfileRowBytesContainer == null) {
-        matchfileRowBytesContainer = new VectorMapJoinRowBytesContainer();
+        matchfileRowBytesContainer = new VectorMapJoinRowBytesContainer(spillLocalDirs);
       }
       return matchfileRowBytesContainer;
     }
@@ -267,12 +274,14 @@ public class HybridHashTableContainer
         HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE),
         HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS),
         HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT),
-        estimatedTableSize, keyCount, memoryAvailable, nwayConf);
+        estimatedTableSize, keyCount, memoryAvailable, nwayConf,
+        RowContainer.getLocalDirsForSpillFiles(hconf));
   }
 
   private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor,
       int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, float probePercent,
-      long estimatedTableSize, long keyCount, long memoryAvailable, HybridHashTableConf nwayConf)
+      long estimatedTableSize, long keyCount, long memoryAvailable, HybridHashTableConf nwayConf,
+      String spillLocalDirs)
       throws SerDeException, IOException {
     directWriteHelper = new MapJoinBytesTableContainer.DirectKeyValueWriter();
 
@@ -282,6 +291,7 @@ public class HybridHashTableContainer
     memoryThreshold = memoryAvailable;
     tableRowSize = estimatedTableSize / (keyCount != 0 ? keyCount : 1);
     memoryCheckFrequency = memCheckFreq;
+    this.spillLocalDirs = spillLocalDirs;
 
     this.nwayConf = nwayConf;
     int writeBufferSize;
@@ -343,21 +353,22 @@ public class HybridHashTableContainer
           nwayConf.getLoadedContainerList().size() == 0) {  // n-way join, first (biggest) small table
         if (i == 0) { // We unconditionally create a hashmap for the first hash partition
           hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize,
-              maxCapacity, true);
+              maxCapacity, true, spillLocalDirs);
         } else {
           // To check whether we have enough memory to allocate for another hash partition,
           // we need to get the size of the first hash partition to get an idea.
           hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize,
-              maxCapacity, memoryUsed + hashPartitions[0].hashMap.memorySize() < memoryThreshold);
+              maxCapacity, memoryUsed + hashPartitions[0].hashMap.memorySize() < memoryThreshold,
+              spillLocalDirs);
         }
       } else {                                              // n-way join, all later small tables
         // For all later small tables, follow the same pattern of the previously loaded tables.
         if (this.nwayConf.doSpillOnCreation(i)) {
           hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize,
-              maxCapacity, false);
+              maxCapacity, false, spillLocalDirs);
         } else {
           hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize,
-              maxCapacity, true);
+              maxCapacity, true, spillLocalDirs);
         }
       }
 
@@ -542,8 +553,9 @@ public class HybridHashTableContainer
     HashPartition partition = hashPartitions[partitionId];
     int inMemRowCount = partition.hashMap.getNumValues();
 
-    Path path = Files.createTempFile("partition-" + partitionId + "-", null);
-    OutputStream outputStream = Files.newOutputStream(path);
+    File file = FileUtils.createLocalDirsTempFile(
+        spillLocalDirs, "partition-" + partitionId + "-", null, false);
+    OutputStream outputStream = new FileOutputStream(file, false);
 
     com.esotericsoftware.kryo.io.Output output =
         new com.esotericsoftware.kryo.io.Output(outputStream);
@@ -556,11 +568,11 @@ public class HybridHashTableContainer
       SerializationUtilities.releaseKryo(kryo);
     }
 
-    partition.hashMapLocalPath = path;
+    partition.hashMapLocalPath = file.toPath();
     partition.hashMapOnDisk = true;
 
     LOG.info("Spilling hash partition " + partitionId + " (Rows: " + inMemRowCount +
-        ", Mem size: " + partition.hashMap.memorySize() + "): " + path);
+        ", Mem size: " + partition.hashMap.memorySize() + "): " + file);
     LOG.info("Memory usage before spilling: " + memoryUsed);
 
     long memFreed = partition.hashMap.memorySize();

http://git-wip-us.apache.org/repos/asf/hive/blob/20a8192a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
index d403c58..e2b22d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -52,36 +53,34 @@ public class KeyValueContainer {
   private int readCursor = 0;             // cursor during reading
   private int rowsOnDisk = 0;             // total number of pairs in output
 
-  private File parentFile;
+  private File parentDir;
   private File tmpFile;
 
   private Input input;
   private Output output;
 
-  public KeyValueContainer() {
+  public KeyValueContainer(String spillLocalDirs) {
     readBuffer = new ObjectPair[IN_MEMORY_NUM_ROWS];
     for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) {
       readBuffer[i] = new ObjectPair<HiveKey, BytesWritable>();
     }
     try {
-      setupOutput();
+      setupOutput(spillLocalDirs);
     } catch (IOException | HiveException e) {
       throw new RuntimeException("Failed to create temporary output file on disk", e);
     }
   }
 
-  private void setupOutput() throws IOException, HiveException {
+  private void setupOutput(String spillLocalDirs) throws IOException, HiveException {
     FileOutputStream fos = null;
     try {
-      if (parentFile == null) {
-        parentFile = File.createTempFile("key-value-container", "");
-        if (parentFile.delete() && parentFile.mkdir()) {
-          parentFile.deleteOnExit();
-        }
+      if (parentDir == null) {
+        parentDir = FileUtils.createLocalDirsTempFile(spillLocalDirs, "key-value-container", "", true);
+        parentDir.deleteOnExit();
       }
 
       if (tmpFile == null || input != null) {
-        tmpFile = File.createTempFile("KeyValueContainer", ".tmp", parentFile);
+        tmpFile = File.createTempFile("KeyValueContainer", ".tmp", parentDir);
         LOG.info("KeyValueContainer created temp file " + tmpFile.getAbsolutePath());
         tmpFile.deleteOnExit();
       }
@@ -131,7 +130,7 @@ public class KeyValueContainer {
     readCursor = rowsInReadBuffer = rowsOnDisk = 0;
     readBufferUsed = false;
 
-    if (parentFile != null) {
+    if (parentDir != null) {
       if (input != null) {
         try {
           input.close();
@@ -147,10 +146,10 @@ public class KeyValueContainer {
         output = null;
       }
       try {
-        FileUtil.fullyDelete(parentFile);
+        FileUtil.fullyDelete(parentDir);
       } catch (Throwable ignored) {
       }
-      parentFile = null;
+      parentDir = null;
       tmpFile = null;
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/20a8192a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java
index a976de0..ee9da23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ObjectContainer.java
@@ -23,6 +23,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.slf4j.Logger;
@@ -53,36 +54,33 @@ public class ObjectContainer<ROW> {
   private int readCursor = 0;             // cursor during reading
   private int rowsOnDisk = 0;             // total number of objects in output
 
-  private File parentFile;
+  private File parentDir;
   private File tmpFile;
 
   private Input input;
   private Output output;
 
-  public ObjectContainer() {
+  public ObjectContainer(String spillLocalDirs) {
     readBuffer = (ROW[]) new Object[IN_MEMORY_NUM_ROWS];
     for (int i = 0; i < IN_MEMORY_NUM_ROWS; i++) {
       readBuffer[i] = (ROW) new Object();
     }
     try {
-      setupOutput();
+      setupOutput(spillLocalDirs);
     } catch (IOException | HiveException e) {
       throw new RuntimeException("Failed to create temporary output file on disk", e);
     }
   }
 
-  private void setupOutput() throws IOException, HiveException {
+  private void setupOutput(String spillLocalDirs) throws IOException, HiveException {
     FileOutputStream fos = null;
     try {
-      if (parentFile == null) {
-        parentFile = File.createTempFile("object-container", "");
-        if (parentFile.delete() && parentFile.mkdir()) {
-          parentFile.deleteOnExit();
-        }
+      if (parentDir == null) {
+        parentDir = FileUtils.createLocalDirsTempFile(spillLocalDirs, "object-container", "", true);
       }
 
       if (tmpFile == null || input != null) {
-        tmpFile = File.createTempFile("ObjectContainer", ".tmp", parentFile);
+        tmpFile = File.createTempFile("ObjectContainer", ".tmp", parentDir);
         LOG.info("ObjectContainer created temp file " + tmpFile.getAbsolutePath());
         tmpFile.deleteOnExit();
       }
@@ -112,7 +110,7 @@ public class ObjectContainer<ROW> {
     readCursor = rowsInReadBuffer = rowsOnDisk = 0;
     readBufferUsed = false;
 
-    if (parentFile != null) {
+    if (parentDir != null) {
       if (input != null) {
         try {
           input.close();
@@ -128,10 +126,10 @@ public class ObjectContainer<ROW> {
         output = null;
       }
       try {
-        FileUtil.fullyDelete(parentFile);
+        FileUtil.fullyDelete(parentDir);
       } catch (Throwable ignored) {
       }
-      parentFile = null;
+      parentDir = null;
       tmpFile = null;
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/20a8192a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
index 358f692..893d265 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
@@ -31,6 +31,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -91,7 +94,7 @@ public class RowContainer<ROW extends List<Object>>
   private long size;    // total # of elements in the RowContainer
   private File tmpFile; // temporary file holding the spilled blocks
   Path tempOutPath = null;
-  private File parentFile;
+  private File parentDir;
   private int itrCursor; // iterator cursor in the currBlock
   private int readBlockSize; // size of current read block
   private int addCursor; // append cursor in the lastBlock
@@ -112,6 +115,8 @@ public class RowContainer<ROW extends List<Object>>
   InputSplit[] inputSplits = null;
   private ROW dummyRow = null;
   private final Reporter reporter;
+  private final String spillFileDirs;
+
 
   Writable val = null; // cached to use serialize data
 
@@ -130,6 +135,7 @@ public class RowContainer<ROW extends List<Object>>
     this.size = 0;
     this.itrCursor = 0;
     this.addCursor = 0;
+    this.spillFileDirs = getLocalDirsForSpillFiles(jc);
     this.numFlushedBlocks = 0;
     this.tmpFile = null;
     this.currentWriteBlock = (ROW[]) new ArrayList[blockSize];
@@ -145,6 +151,11 @@ public class RowContainer<ROW extends List<Object>>
     }
   }
 
+  public static String getLocalDirsForSpillFiles(Configuration conf) {
+    return LlapProxy.isDaemon()
+        ? LlapUtil.getDaemonLocalDirList(conf) : conf.get("yarn.nodemanager.local-dirs");
+  }
+
   private JobConf getLocalFSJobConfClone(Configuration jc) {
     if (this.jobCloneUsingLocalFs == null) {
       this.jobCloneUsingLocalFs = new JobConf(jc);
@@ -220,7 +231,7 @@ public class RowContainer<ROW extends List<Object>>
           }
 
           localJc.set(FileInputFormat.INPUT_DIR,
-              org.apache.hadoop.util.StringUtils.escapeString(parentFile.getAbsolutePath()));
+              org.apache.hadoop.util.StringUtils.escapeString(parentDir.getAbsolutePath()));
           inputSplits = inputFormat.getSplits(localJc, 1);
           actualSplitNum = inputSplits.length;
         }
@@ -289,7 +300,7 @@ public class RowContainer<ROW extends List<Object>>
   }
 
   private final ArrayList<Object> row = new ArrayList<Object>(2);
-
+  
   private void spillBlock(ROW[] block, int length) throws HiveException {
     try {
       if (tmpFile == null) {
@@ -445,8 +456,8 @@ public class RowContainer<ROW extends List<Object>>
       rw = null;
       rr = null;
       tmpFile = null;
-      deleteLocalFile(parentFile, true);
-      parentFile = null;
+      deleteLocalFile(parentDir, true);
+      parentDir = null;
     }
   }
 
@@ -518,21 +529,14 @@ public class RowContainer<ROW extends List<Object>>
         suffix = "." + this.keyObject.toString() + suffix;
       }
 
-      while (true) {
-        parentFile = File.createTempFile("hive-rowcontainer", "");
-        boolean success = parentFile.delete() && parentFile.mkdir();
-        if (success) {
-          break;
-        }
-        LOG.debug("retry creating tmp row-container directory...");
-      }
+      parentDir = FileUtils.createLocalDirsTempFile(spillFileDirs, "hive-rowcontainer", "", true);
 
-      tmpFile = File.createTempFile("RowContainer", suffix, parentFile);
+      tmpFile = File.createTempFile("RowContainer", suffix, parentDir);
       LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
       // Delete the temp file if the JVM terminate normally through Hadoop job
       // kill command.
       // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
-      parentFile.deleteOnExit();
+      parentDir.deleteOnExit();
       tmpFile.deleteOnExit();
 
       // rFile = new RandomAccessFile(tmpFile, "rw");

http://git-wip-us.apache.org/repos/asf/hive/blob/20a8192a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 79da860..8aca779 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -577,6 +577,7 @@ public class DagUtils {
       }
     }
 
+    // TODO# HERE?
     if (mapWork instanceof MergeFileWork) {
       Path outputPath = ((MergeFileWork) mapWork).getOutputDir();
       // prepare the tmp output directory. The output tmp directory should

http://git-wip-us.apache.org/repos/asf/hive/blob/20a8192a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
index 4c539d8..fa96ae9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 
 import java.io.File;
@@ -34,7 +35,7 @@ public class VectorMapJoinRowBytesContainer {
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinRowBytesContainer.class);
 
-  private File parentFile;
+  private File parentDir;
   private File tmpFile;
 
   // We buffer in a org.apache.hadoop.hive.serde2.ByteStream.Output since that is what
@@ -72,7 +73,9 @@ public class VectorMapJoinRowBytesContainer {
 
   private FileInputStream fileInputStream;
 
-  public VectorMapJoinRowBytesContainer() {
+  private final String spillLocalDirs;
+
+  public VectorMapJoinRowBytesContainer(String spillLocalDirs) {
     output = new Output();
     readBuffer = new byte[INPUT_SIZE];
     readNextBytes = new byte[MAX_READS][];
@@ -81,16 +84,13 @@ public class VectorMapJoinRowBytesContainer {
     isOpen = false;
     totalWriteLength = 0;
     totalReadLength = 0;
+    this.spillLocalDirs = spillLocalDirs;
   }
 
   private void setupOutputFileStreams() throws IOException {
-
-    parentFile = File.createTempFile("bytes-container", "");
-    if (parentFile.delete() && parentFile.mkdir()) {
-      parentFile.deleteOnExit();
-    }
-
-    tmpFile = File.createTempFile("BytesContainer", ".tmp", parentFile);
+    parentDir = FileUtils.createLocalDirsTempFile(spillLocalDirs, "bytes-container", "", true);
+    parentDir.deleteOnExit();
+    tmpFile = File.createTempFile("BytesContainer", ".tmp", parentDir);
     LOG.debug("BytesContainer created temp file " + tmpFile.getAbsolutePath());
     tmpFile.deleteOnExit();
 
@@ -306,13 +306,13 @@ public class VectorMapJoinRowBytesContainer {
       fileOutputStream = null;
     }
 
-    if (parentFile != null) {
+    if (parentDir != null) {
       try {
-        FileUtil.fullyDelete(parentFile);
+        FileUtil.fullyDelete(parentDir);
       } catch (Throwable ignored) {
       }
     }
-    parentFile = null;
+    parentDir = null;
     tmpFile = null;
     isOpen = false;
     totalWriteLength = 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/20a8192a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestHashPartition.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestHashPartition.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestHashPartition.java
index a6e52bd..efabd2b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestHashPartition.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/persistence/TestHashPartition.java
@@ -24,6 +24,7 @@ public class TestHashPartition {
 
   @Test
   public void testHashPartition() throws Exception {
-    HashPartition hashPartition = new HashPartition(1024, (float) 0.75, 524288, 1, true);
+    // TODO: wtf?
+    HashPartition hashPartition = new HashPartition(1024, (float) 0.75, 524288, 1, true, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/20a8192a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinRowBytesContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinRowBytesContainer.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinRowBytesContainer.java
index 3c3aacd..afe4e70 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinRowBytesContainer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinRowBytesContainer.java
@@ -31,7 +31,8 @@ public class TestVectorMapJoinRowBytesContainer  {
   public void doFillReplay(Random random, int maxCount) throws Exception {
 
     RandomByteArrayStream randomByteArrayStream = new RandomByteArrayStream(random);
-    VectorMapJoinRowBytesContainer vectorMapJoinRowBytesContainer = new VectorMapJoinRowBytesContainer();
+    VectorMapJoinRowBytesContainer vectorMapJoinRowBytesContainer =
+        new VectorMapJoinRowBytesContainer(null);
 
     int count = Math.min(maxCount, random.nextInt(500));
     for (int i = 0; i < count; i++) {