You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2015/01/15 23:22:15 UTC

svn commit: r1652290 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org/apache/hadoop/hive/ql/plan/ shims/0.20S/src/main/java/org/apach...

Author: gopalv
Date: Thu Jan 15 22:22:14 2015
New Revision: 1652290

URL: http://svn.apache.org/r1652290
Log:
HIVE-7313: Memory & SSD storage policies for temporary tables. (Gopal V, reviewed by Gunther Hagleitner)

Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
    hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
    hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Jan 15 22:22:14 2015
@@ -1615,6 +1615,10 @@ public class HiveConf extends Configurat
         "inheriting the permission of the warehouse or database directory."),
     HIVE_INSERT_INTO_EXTERNAL_TABLES("hive.insert.into.external.tables", true,
         "whether insert into external tables is allowed"),
+    HIVE_TEMPORARY_TABLE_STORAGE(
+        "hive.exec.temporary.table.storage", "default", new StringSet("memory",
+         "ssd", "default"), "Define the storage policy for temporary tables." +
+         "Choices between memory, ssd and default"),
 
     HIVE_DRIVER_RUN_HOOKS("hive.exec.driver.run.hooks", "",
         "A comma separated list of hooks which implement HiveDriverRunHook. Will be run at the beginning " +

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Jan 15 22:22:14 2015
@@ -68,11 +68,16 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim;
+import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
+
 /**
  * File Sink operator implementation.
  **/
@@ -88,6 +93,7 @@ public class FileSinkOperator extends Te
   protected transient List<String> dpColNames;
   protected transient DynamicPartitionCtx dpCtx;
   protected transient boolean isCompressed;
+  protected transient boolean isTemporary;
   protected transient Path parent;
   protected transient HiveOutputFormat<?, ?> hiveOutputFormat;
   protected transient Path specPath;
@@ -318,6 +324,7 @@ public class FileSinkOperator extends Te
       this.hconf = hconf;
       filesCreated = false;
       isNativeTable = !conf.getTableInfo().isNonNative();
+      isTemporary = conf.isTemporary();
       multiFileSpray = conf.isMultiFileSpray();
       totalFiles = conf.getTotalFiles();
       numFiles = conf.getNumFiles();
@@ -384,6 +391,20 @@ public class FileSinkOperator extends Te
           valToPaths.put("", fsp); // special entry for non-DP case
         }
       }
+      
+      final StoragePolicyValue tmpStorage = StoragePolicyValue.lookup(HiveConf
+                                            .getVar(hconf, HIVE_TEMPORARY_TABLE_STORAGE));
+      if (isTemporary && fsp != null
+          && tmpStorage != StoragePolicyValue.DEFAULT) {
+        final Path outputPath = fsp.taskOutputTempPath;
+        StoragePolicyShim shim = ShimLoader.getHadoopShims()
+            .getStoragePolicyShim(fs);
+        if (shim != null) {
+          // directory creation is otherwise within the writers
+          fs.mkdirs(outputPath);
+          shim.setStoragePolicy(outputPath, tmpStorage);
+        }
+      }
 
       if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
           conf.getWriteType() == AcidUtils.Operation.DELETE) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu Jan 15 22:22:14 2015
@@ -5857,6 +5857,7 @@ public class SemanticAnalyzer extends Ba
 
     Table dest_tab = null; // destination table if any
     boolean destTableIsAcid = false; // should the destination table be written to using ACID
+    boolean destTableIsTemporary = false;
     Partition dest_part = null;// destination partition if any
     Path queryTmpdir = null; // the intermediate destination directory
     Path dest_path = null; // the final destination directory
@@ -5874,6 +5875,7 @@ public class SemanticAnalyzer extends Ba
 
       dest_tab = qbm.getDestTableForAlias(dest);
       destTableIsAcid = isAcidTable(dest_tab);
+      destTableIsTemporary = dest_tab.isTemporary();
 
       // Is the user trying to insert into a external tables
       if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
@@ -6143,6 +6145,7 @@ public class SemanticAnalyzer extends Ba
       CreateTableDesc tblDesc = qb.getTableDesc();
       if (tblDesc != null) {
         field_schemas = new ArrayList<FieldSchema>();
+        destTableIsTemporary = tblDesc.isTemporary();
       }
 
       boolean first = true;
@@ -6287,6 +6290,8 @@ public class SemanticAnalyzer extends Ba
       fileSinkDesc.setWriteType(wt);
       acidFileSinks.add(fileSinkDesc);
     }
+    
+    fileSinkDesc.setTemporary(destTableIsTemporary);
 
     /* Set List Bucketing context. */
     if (lbCtx != null) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java Thu Jan 15 22:22:14 2015
@@ -48,6 +48,7 @@ public class FileSinkDesc extends Abstra
   private String compressCodec;
   private String compressType;
   private boolean multiFileSpray;
+  private boolean temporary;
   // Whether the files output by this FileSink can be merged, e.g. if they are to be put into a
   // bucketed or sorted table/partition they cannot be merged.
   private boolean canBeMerged;
@@ -220,6 +221,21 @@ public class FileSinkDesc extends Abstra
   public void setMultiFileSpray(boolean multiFileSpray) {
     this.multiFileSpray = multiFileSpray;
   }
+  
+  /**
+   * @return destination is temporary
+   */
+  public boolean isTemporary() {
+    return temporary;
+  }
+
+  /**
+   * @param totalFiles the totalFiles to set
+   */
+  public void setTemporary(boolean temporary) {
+    this.temporary = temporary;
+  }
+
 
   public boolean canBeMerged() {
     return canBeMerged;

Modified: hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Thu Jan 15 22:22:14 2015
@@ -606,4 +606,9 @@ public class Hadoop20SShims extends Hado
       return kerberosName.getShortName();
     }
   }
+  
+  @Override
+  public StoragePolicyShim getStoragePolicyShim(FileSystem fs) {
+    return null;
+  }
 }

Modified: hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Thu Jan 15 22:22:14 2015
@@ -27,8 +27,10 @@ import java.security.AccessControlExcept
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.lang.StringUtils;
@@ -53,7 +55,10 @@ import org.apache.hadoop.fs.permission.A
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobConf;
@@ -91,15 +96,28 @@ public class Hadoop23Shims extends Hadoo
   HadoopShims.MiniDFSShim cluster = null;
 
   final boolean zeroCopy;
+  final boolean storagePolicy;
 
   public Hadoop23Shims() {
     boolean zcr = false;
+    boolean storage = false;
     try {
       Class.forName("org.apache.hadoop.fs.CacheFlag", false,
           ShimLoader.class.getClassLoader());
       zcr = true;
     } catch (ClassNotFoundException ce) {
     }
+    
+    if (zcr) {
+      // in-memory HDFS is only available after zcr
+      try {
+        Class.forName("org.apache.hadoop.hdfs.protocol.BlockStoragePolicy",
+            false, ShimLoader.class.getClassLoader());
+        storage = true;
+      } catch (ClassNotFoundException ce) {
+      }
+    }
+    this.storagePolicy = storage;
     this.zeroCopy = zcr;
   }
 
@@ -935,4 +953,47 @@ public class Hadoop23Shims extends Hadoo
       return kerberosName.getShortName();
     }
   }
+
+  
+  public static class StoragePolicyShim implements HadoopShims.StoragePolicyShim {
+
+    private final DistributedFileSystem dfs;
+    
+    public StoragePolicyShim(DistributedFileSystem fs) {
+      this.dfs = fs;
+    }
+
+    @Override
+    public void setStoragePolicy(Path path, StoragePolicyValue policy)
+        throws IOException {
+      switch (policy) {
+      case MEMORY: {
+        dfs.setStoragePolicy(path, HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
+        break;
+      }
+      case SSD: {
+        dfs.setStoragePolicy(path, HdfsConstants.ALLSSD_STORAGE_POLICY_NAME);
+        break;
+      }
+      case DEFAULT: {
+        /* do nothing */
+        break;
+      }
+      default: 
+        throw new IllegalArgumentException("Unknown storage policy " + policy);
+      }
+    }
+  }
+
+  @Override
+  public HadoopShims.StoragePolicyShim getStoragePolicyShim(FileSystem fs) {
+    if (!storagePolicy) {
+      return null;
+    }
+    try {
+      return new StoragePolicyShim((DistributedFileSystem) fs);
+    } catch (ClassCastException ce) {
+      return null;
+    }
+  }
 }

Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Thu Jan 15 22:22:14 2015
@@ -26,6 +26,7 @@ import java.security.AccessControlExcept
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobConf;
@@ -388,6 +390,33 @@ public interface HadoopShims {
   public FileSystem createProxyFileSystem(FileSystem fs, URI uri);
 
   public Map<String, String> getHadoopConfNames();
+  
+  /**
+   * Create a shim for DFS storage policy.
+   */
+  
+  public enum StoragePolicyValue {
+    MEMORY, /* 1-replica memory */
+    SSD, /* 3-replica ssd */
+    DEFAULT /* system defaults (usually 3-replica disk) */;
+
+    public static StoragePolicyValue lookup(String name) {
+      if (name == null) {
+        return DEFAULT;
+      }
+      return StoragePolicyValue.valueOf(name.toUpperCase().trim());
+    }
+  };
+  
+  public interface StoragePolicyShim {
+    void setStoragePolicy(Path path, StoragePolicyValue policy) throws IOException;
+  }
+  
+  /**
+   *  obtain a storage policy shim associated with the filesystem.
+   *  Returns null when the filesystem has no storage policies.
+   */
+  public StoragePolicyShim getStoragePolicyShim(FileSystem fs);
 
   /**
    * a hadoop.io ByteBufferPool shim.