You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2015/01/15 23:22:15 UTC
svn commit: r1652290 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/parse/
ql/src/java/org/apache/hadoop/hive/ql/plan/
shims/0.20S/src/main/java/org/apach...
Author: gopalv
Date: Thu Jan 15 22:22:14 2015
New Revision: 1652290
URL: http://svn.apache.org/r1652290
Log:
HIVE-7313: Memory & SSD storage policies for temporary tables. (Gopal V, reviewed by Gunther Hagleitner)
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Jan 15 22:22:14 2015
@@ -1615,6 +1615,10 @@ public class HiveConf extends Configurat
"inheriting the permission of the warehouse or database directory."),
HIVE_INSERT_INTO_EXTERNAL_TABLES("hive.insert.into.external.tables", true,
"whether insert into external tables is allowed"),
+ HIVE_TEMPORARY_TABLE_STORAGE(
+ "hive.exec.temporary.table.storage", "default", new StringSet("memory",
+ "ssd", "default"), "Define the storage policy for temporary tables." +
+ "Choices between memory, ssd and default"),
HIVE_DRIVER_RUN_HOOKS("hive.exec.driver.run.hooks", "",
"A comma separated list of hooks which implement HiveDriverRunHook. Will be run at the beginning " +
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Jan 15 22:22:14 2015
@@ -68,11 +68,16 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim;
+import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
+
/**
* File Sink operator implementation.
**/
@@ -88,6 +93,7 @@ public class FileSinkOperator extends Te
protected transient List<String> dpColNames;
protected transient DynamicPartitionCtx dpCtx;
protected transient boolean isCompressed;
+ protected transient boolean isTemporary;
protected transient Path parent;
protected transient HiveOutputFormat<?, ?> hiveOutputFormat;
protected transient Path specPath;
@@ -318,6 +324,7 @@ public class FileSinkOperator extends Te
this.hconf = hconf;
filesCreated = false;
isNativeTable = !conf.getTableInfo().isNonNative();
+ isTemporary = conf.isTemporary();
multiFileSpray = conf.isMultiFileSpray();
totalFiles = conf.getTotalFiles();
numFiles = conf.getNumFiles();
@@ -384,6 +391,20 @@ public class FileSinkOperator extends Te
valToPaths.put("", fsp); // special entry for non-DP case
}
}
+
+ final StoragePolicyValue tmpStorage = StoragePolicyValue.lookup(HiveConf
+ .getVar(hconf, HIVE_TEMPORARY_TABLE_STORAGE));
+ if (isTemporary && fsp != null
+ && tmpStorage != StoragePolicyValue.DEFAULT) {
+ final Path outputPath = fsp.taskOutputTempPath;
+ StoragePolicyShim shim = ShimLoader.getHadoopShims()
+ .getStoragePolicyShim(fs);
+ if (shim != null) {
+ // directory creation is otherwise within the writers
+ fs.mkdirs(outputPath);
+ shim.setStoragePolicy(outputPath, tmpStorage);
+ }
+ }
if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
conf.getWriteType() == AcidUtils.Operation.DELETE) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu Jan 15 22:22:14 2015
@@ -5857,6 +5857,7 @@ public class SemanticAnalyzer extends Ba
Table dest_tab = null; // destination table if any
boolean destTableIsAcid = false; // should the destination table be written to using ACID
+ boolean destTableIsTemporary = false;
Partition dest_part = null;// destination partition if any
Path queryTmpdir = null; // the intermediate destination directory
Path dest_path = null; // the final destination directory
@@ -5874,6 +5875,7 @@ public class SemanticAnalyzer extends Ba
dest_tab = qbm.getDestTableForAlias(dest);
destTableIsAcid = isAcidTable(dest_tab);
+ destTableIsTemporary = dest_tab.isTemporary();
// Is the user trying to insert into a external tables
if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
@@ -6143,6 +6145,7 @@ public class SemanticAnalyzer extends Ba
CreateTableDesc tblDesc = qb.getTableDesc();
if (tblDesc != null) {
field_schemas = new ArrayList<FieldSchema>();
+ destTableIsTemporary = tblDesc.isTemporary();
}
boolean first = true;
@@ -6287,6 +6290,8 @@ public class SemanticAnalyzer extends Ba
fileSinkDesc.setWriteType(wt);
acidFileSinks.add(fileSinkDesc);
}
+
+ fileSinkDesc.setTemporary(destTableIsTemporary);
/* Set List Bucketing context. */
if (lbCtx != null) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java Thu Jan 15 22:22:14 2015
@@ -48,6 +48,7 @@ public class FileSinkDesc extends Abstra
private String compressCodec;
private String compressType;
private boolean multiFileSpray;
+ private boolean temporary;
// Whether the files output by this FileSink can be merged, e.g. if they are to be put into a
// bucketed or sorted table/partition they cannot be merged.
private boolean canBeMerged;
@@ -220,6 +221,21 @@ public class FileSinkDesc extends Abstra
public void setMultiFileSpray(boolean multiFileSpray) {
this.multiFileSpray = multiFileSpray;
}
+
+ /**
+ * @return destination is temporary
+ */
+ public boolean isTemporary() {
+ return temporary;
+ }
+
+ /**
+ * @param totalFiles the totalFiles to set
+ */
+ public void setTemporary(boolean temporary) {
+ this.temporary = temporary;
+ }
+
public boolean canBeMerged() {
return canBeMerged;
Modified: hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Thu Jan 15 22:22:14 2015
@@ -606,4 +606,9 @@ public class Hadoop20SShims extends Hado
return kerberosName.getShortName();
}
}
+
+ @Override
+ public StoragePolicyShim getStoragePolicyShim(FileSystem fs) {
+ return null;
+ }
}
Modified: hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Thu Jan 15 22:22:14 2015
@@ -27,8 +27,10 @@ import java.security.AccessControlExcept
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
@@ -53,7 +55,10 @@ import org.apache.hadoop.fs.permission.A
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobConf;
@@ -91,15 +96,28 @@ public class Hadoop23Shims extends Hadoo
HadoopShims.MiniDFSShim cluster = null;
final boolean zeroCopy;
+ final boolean storagePolicy;
public Hadoop23Shims() {
boolean zcr = false;
+ boolean storage = false;
try {
Class.forName("org.apache.hadoop.fs.CacheFlag", false,
ShimLoader.class.getClassLoader());
zcr = true;
} catch (ClassNotFoundException ce) {
}
+
+ if (zcr) {
+ // in-memory HDFS is only available after zcr
+ try {
+ Class.forName("org.apache.hadoop.hdfs.protocol.BlockStoragePolicy",
+ false, ShimLoader.class.getClassLoader());
+ storage = true;
+ } catch (ClassNotFoundException ce) {
+ }
+ }
+ this.storagePolicy = storage;
this.zeroCopy = zcr;
}
@@ -935,4 +953,47 @@ public class Hadoop23Shims extends Hadoo
return kerberosName.getShortName();
}
}
+
+
+ public static class StoragePolicyShim implements HadoopShims.StoragePolicyShim {
+
+ private final DistributedFileSystem dfs;
+
+ public StoragePolicyShim(DistributedFileSystem fs) {
+ this.dfs = fs;
+ }
+
+ @Override
+ public void setStoragePolicy(Path path, StoragePolicyValue policy)
+ throws IOException {
+ switch (policy) {
+ case MEMORY: {
+ dfs.setStoragePolicy(path, HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
+ break;
+ }
+ case SSD: {
+ dfs.setStoragePolicy(path, HdfsConstants.ALLSSD_STORAGE_POLICY_NAME);
+ break;
+ }
+ case DEFAULT: {
+ /* do nothing */
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown storage policy " + policy);
+ }
+ }
+ }
+
+ @Override
+ public HadoopShims.StoragePolicyShim getStoragePolicyShim(FileSystem fs) {
+ if (!storagePolicy) {
+ return null;
+ }
+ try {
+ return new StoragePolicyShim((DistributedFileSystem) fs);
+ } catch (ClassCastException ce) {
+ return null;
+ }
+ }
}
Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1652290&r1=1652289&r2=1652290&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Thu Jan 15 22:22:14 2015
@@ -26,6 +26,7 @@ import java.security.AccessControlExcept
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
@@ -38,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobConf;
@@ -388,6 +390,33 @@ public interface HadoopShims {
public FileSystem createProxyFileSystem(FileSystem fs, URI uri);
public Map<String, String> getHadoopConfNames();
+
+ /**
+ * Create a shim for DFS storage policy.
+ */
+
+ public enum StoragePolicyValue {
+ MEMORY, /* 1-replica memory */
+ SSD, /* 3-replica ssd */
+ DEFAULT /* system defaults (usually 3-replica disk) */;
+
+ public static StoragePolicyValue lookup(String name) {
+ if (name == null) {
+ return DEFAULT;
+ }
+ return StoragePolicyValue.valueOf(name.toUpperCase().trim());
+ }
+ };
+
+ public interface StoragePolicyShim {
+ void setStoragePolicy(Path path, StoragePolicyValue policy) throws IOException;
+ }
+
+ /**
+ * obtain a storage policy shim associated with the filesystem.
+ * Returns null when the filesystem has no storage policies.
+ */
+ public StoragePolicyShim getStoragePolicyShim(FileSystem fs);
/**
* a hadoop.io ByteBufferPool shim.