You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/04/15 01:36:09 UTC
svn commit: r934241 [2/16] - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ conf/
metastore/src/java/org/apache/hadoop/hive/metastore/
ql/src/java/org/apache/hadoop/hive/ql/exec/ ql...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Apr 14 23:36:07 2010
@@ -47,7 +47,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -77,12 +76,13 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ErrorMsg;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.io.SequenceFile;
@@ -213,6 +213,7 @@ public final class Utilities {
protected boolean mutatesTo(Object oldInstance, Object newInstance) {
return false;
}
+ @Override
protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
java.util.Collection oldO = (java.util.Collection)oldInstance;
java.util.Collection newO = (java.util.Collection)newInstance;
@@ -237,6 +238,7 @@ public final class Utilities {
protected boolean mutatesTo(Object oldInstance, Object newInstance) {
return false;
}
+ @Override
protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
java.util.Collection oldO = (java.util.Collection)oldInstance;
java.util.Collection newO = (java.util.Collection)newInstance;
@@ -262,6 +264,7 @@ public final class Utilities {
protected boolean mutatesTo(Object oldInstance, Object newInstance) {
return false;
}
+ @Override
protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
java.util.Collection oldO = (java.util.Collection)oldInstance;
java.util.Collection newO = (java.util.Collection)newInstance;
@@ -339,6 +342,7 @@ public final class Utilities {
}
public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate {
+ @Override
protected Expression instantiate(Object oldInstance, Encoder out) {
return new Expression(oldInstance,
oldInstance.getClass(),
@@ -346,6 +350,7 @@ public final class Utilities {
null);
}
+ @Override
protected void initialize(Class type, Object oldInstance, Object newInstance,
Encoder out) {
Iterator ite = ((Collection) oldInstance).iterator();
@@ -712,8 +717,7 @@ public final class Utilities {
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = FileOutputFormat
.getOutputCompressorClass(jc, DefaultCodec.class);
- CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(
- codecClass, jc);
+ CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
return codec.createOutputStream(out);
} else {
return (out);
@@ -736,8 +740,7 @@ public final class Utilities {
} else {
Class<? extends CompressionCodec> codecClass = FileOutputFormat
.getOutputCompressorClass(jc, DefaultCodec.class);
- CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(
- codecClass, jc);
+ CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
return codec.getDefaultExtension();
}
}
@@ -967,20 +970,30 @@ public final class Utilities {
private static Pattern fileNameTaskIdRegex = Pattern.compile("^.*_([0-9]*)_[0-9](\\..*)?$");
/**
+ * Local job name looks like "job_local_1_map_0000", where 1 is job ID and 0000 is task ID.
+ */
+ private static Pattern fileNameLocalTaskIdRegex = Pattern.compile(".*local.*_([0-9]*)$");
+
+ /**
* Get the task id from the filename. E.g., get "000000" out of
* "24931_r_000000_0" or "24931_r_000000_0.gz"
*/
public static String getTaskIdFromFilename(String filename) {
+ String taskId = filename;
Matcher m = fileNameTaskIdRegex.matcher(filename);
if (!m.matches()) {
- LOG.warn("Unable to get task id from file name: " + filename
- + ". Using full filename as task id.");
- return filename;
+ Matcher m2 = fileNameLocalTaskIdRegex.matcher(filename);
+ if (!m2.matches()) {
+ LOG.warn("Unable to get task id from file name: " + filename
+ + ". Using full filename as task id.");
+ } else {
+ taskId = m2.group(1);
+ }
} else {
- String taskId = m.group(1);
- LOG.debug("TaskId for " + filename + " = " + taskId);
- return taskId;
+ taskId = m.group(1);
}
+ LOG.debug("TaskId for " + filename + " = " + taskId);
+ return taskId;
}
/**
@@ -990,6 +1003,11 @@ public final class Utilities {
*/
public static String replaceTaskIdFromFilename(String filename, int bucketNum) {
String taskId = getTaskIdFromFilename(filename);
+ String newTaskId = replaceTaskId(taskId, bucketNum);
+ return replaceTaskIdFromFilename(filename, taskId, newTaskId);
+ }
+
+ public static String replaceTaskId(String taskId, int bucketNum) {
String strBucketNum = String.valueOf(bucketNum);
int bucketNumLen = strBucketNum.length();
int taskIdLen = taskId.length();
@@ -997,17 +1015,30 @@ public final class Utilities {
for (int i = 0; i < taskIdLen - bucketNumLen; i++) {
s.append("0");
}
- String newTaskId = s.toString() + strBucketNum;
- String[] spl = filename.split(taskId);
+ return s.toString() + strBucketNum;
+ }
+
+ /**
+ * Replace the oldTaskId appearing in the filename by the newTaskId.
+ * The string oldTaskId could appear multiple times, we should only replace the last one.
+ * @param filename
+ * @param oldTaskId
+ * @param newTaskId
+ * @return
+ */
+ public static String replaceTaskIdFromFilename(String filename,
+ String oldTaskId, String newTaskId) {
+
+ String[] spl = filename.split(oldTaskId);
if ((spl.length == 0) || (spl.length == 1)) {
- return filename.replaceAll(taskId, newTaskId);
+ return filename.replaceAll(oldTaskId, newTaskId);
}
StringBuffer snew = new StringBuffer();
for (int idx = 0; idx < spl.length-1; idx++) {
if (idx > 0) {
- snew.append(taskId);
+ snew.append(oldTaskId);
}
snew.append(spl[idx]);
}
@@ -1017,21 +1048,88 @@ public final class Utilities {
}
/**
+ * Get all file status from a root path and recursively go deep into certain levels.
+ * @param path the root path
+ * @param level the depth of directory should explore
+ * @param fs the file system
+ * @return array of FileStatus
+ * @throws IOException
+ */
+ public static FileStatus[] getFileStatusRecurse(Path path, int level,
+ FileSystem fs) throws IOException {
+
+ // construct a path pattern (e.g., /*/*) to find all dynamically generated paths
+ StringBuilder sb = new StringBuilder(path.toUri().getPath());
+ for (int i = 0; i < level; ++i) {
+ sb.append(Path.SEPARATOR).append("*");
+ }
+ Path pathPattern = new Path(path, sb.toString());
+ return fs.globStatus(pathPattern);
+ }
+
+ /**
+ * Remove all temporary files and duplicate (double-committed) files from a
+ * given directory.
+ * @return a list of path names corresponding to should-be-created empty buckets.
+ */
+ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws IOException {
+ removeTempOrDuplicateFiles(fs, path, null);
+ }
+
+ /**
* Remove all temporary files and duplicate (double-committed) files from a
* given directory.
+ * @return a list of path names corresponding to should-be-created empty buckets.
*/
- public static void removeTempOrDuplicateFiles(FileSystem fs, Path path)
+ public static ArrayList<String> removeTempOrDuplicateFiles(FileSystem fs, Path path, DynamicPartitionCtx dpCtx)
throws IOException {
if (path == null) {
- return;
+ return null;
}
- FileStatus[] items = fs.listStatus(path);
- if (items == null) {
- return;
+ ArrayList<String> result = new ArrayList<String>();
+ if (dpCtx != null) {
+ FileStatus parts[] = getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs);
+ HashMap<String, FileStatus> taskIDToFile = null;
+
+ for (int i = 0; i < parts.length; ++i) {
+ assert parts[i].isDir(): "dynamic partition " + parts[i].getPath() + " is not a direcgtory";
+ FileStatus[] items = fs.listStatus(parts[i].getPath());
+ taskIDToFile = removeTempOrDuplicateFiles(items, fs);
+ // if the table is bucketed and enforce bucketing, we should check and generate all buckets
+ if (dpCtx.getNumBuckets() > 0 && taskIDToFile != null) {
+ // refresh the file list
+ items = fs.listStatus(parts[i].getPath());
+ // get the missing buckets and generate empty buckets
+ String taskID1 = taskIDToFile.keySet().iterator().next();
+ Path bucketPath = taskIDToFile.values().iterator().next().getPath();
+ for (int j = 0; j < dpCtx.getNumBuckets(); ++j ) {
+ String taskID2 = replaceTaskId(taskID1, j);
+ if (!taskIDToFile.containsKey(taskID2)) {
+ // create empty bucket, file name should be derived from taskID2
+ String path2 = replaceTaskIdFromFilename(bucketPath.toUri().getPath().toString(), j);
+ result.add(path2);
+ }
+ }
+ }
+ }
+ } else {
+ FileStatus[] items = fs.listStatus(path);
+ removeTempOrDuplicateFiles(items, fs);
+ }
+ return result;
+ }
+
+ public static HashMap<String, FileStatus> removeTempOrDuplicateFiles(
+ FileStatus[] items, FileSystem fs)
+ throws IOException {
+
+ if (items == null || fs == null) {
+ return null;
}
HashMap<String, FileStatus> taskIdToFile = new HashMap<String, FileStatus>();
+
for (FileStatus one : items) {
if (isTempPath(one)) {
if (!fs.delete(one.getPath(), true)) {
@@ -1053,6 +1151,7 @@ public final class Utilities {
}
}
}
+ return taskIdToFile;
}
public static String getNameMessage(Exception e) {
@@ -1230,4 +1329,5 @@ public final class Utilities {
job.set(entry.getKey(), entry.getValue());
}
}
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Wed Apr 14 23:36:07 2010
@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -50,27 +50,30 @@ import org.apache.hadoop.util.Reflection
/**
* Simple persistent container for rows.
- *
+ *
* This container interface only accepts adding or appending new rows and
* iterating through the rows in the order of their insertions.
- *
+ *
* The iterator interface is a lightweight first()/next() API rather than the
* Java Iterator interface. This way we do not need to create an Iterator object
* every time we want to start a new iteration. Below is simple example of how
- * to convert a typical Java's Iterator code to the LW iterator iterface.
- *
- * Itereator itr = rowContainer.iterator(); while (itr.hasNext()) { v =
- * itr.next(); // do anything with v }
- *
+ * to convert a typical Java's Iterator code to the LW iterator interface.
+ *
+ * Iterator itr = rowContainer.iterator();
+ * while (itr.hasNext()) {
+ * v = itr.next(); // do anything with v
+ * }
+ *
* can be rewritten to:
- *
- * for ( v = rowContainer.first(); v != null; v = rowContainer.next()) { // do
- * anything with v }
- *
+ *
+ * for ( v = rowContainer.first(); v != null; v = rowContainer.next()) {
+ * // do anything with v
+ * }
+ *
* Once the first is called, it will not be able to write again. So there can
* not be any writes after read. It can be read multiple times, but it does not
* support multiple reader interleaving reading.
- *
+ *
*/
public class RowContainer<Row extends List<Object>> {
@@ -82,7 +85,7 @@ public class RowContainer<Row extends Li
private Row[] currentWriteBlock; // the last block that add() should append to
private Row[] currentReadBlock; // the current block where the cursor is in
// since currentReadBlock may assigned to currentWriteBlock, we need to store
- // orginal read block
+ // original read block
private Row[] firstReadBlockPointer;
private int blockSize; // number of objects in the block before it is spilled
// to disk
@@ -201,7 +204,7 @@ public class RowContainer<Row extends Li
} else {
if (inputSplits == null) {
if (this.inputFormat == null) {
- inputFormat = (InputFormat<WritableComparable, Writable>)ReflectionUtils
+ inputFormat = (InputFormat<WritableComparable, Writable>) ReflectionUtils
.newInstance(tblDesc.getInputFileFormatClass(),
jobCloneUsingLocalFs);
}
@@ -355,7 +358,7 @@ public class RowContainer<Row extends Li
/**
* Get the number of elements in the RowContainer.
- *
+ *
* @return number of elements in the RowContainer
*/
public int size() {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java Wed Apr 14 23:36:07 2010
@@ -60,10 +60,10 @@ public class WriteEntity implements Seri
/**
* This is derived from t and p, but we need to serialize this field to make sure
- * WriteEntity.hashCode() does not need to recursively read into t and p.
+ * WriteEntity.hashCode() does not need to recursively read into t and p.
*/
private String name;
-
+
public String getName() {
return name;
}
@@ -109,10 +109,10 @@ public class WriteEntity implements Seri
*/
public WriteEntity() {
}
-
+
/**
* Constructor for a table.
- *
+ *
* @param t
* Table that is written to.
*/
@@ -126,7 +126,7 @@ public class WriteEntity implements Seri
/**
* Constructor for a partition.
- *
+ *
* @param p
* Partition that is written to.
*/
@@ -140,7 +140,7 @@ public class WriteEntity implements Seri
/**
* Constructor for a file.
- *
+ *
* @param d
* The name of the directory that is being written to.
* @param islocal
@@ -205,7 +205,7 @@ public class WriteEntity implements Seri
public String toString() {
return name;
}
-
+
private String computeName() {
switch (typ) {
case TABLE:
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Wed Apr 14 23:36:07 2010
@@ -29,13 +29,13 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Wed Apr 14 23:36:07 2010
@@ -19,8 +19,8 @@
package org.apache.hadoop.hive.ql.metadata;
import java.io.IOException;
-import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -537,7 +537,7 @@ public class Hive {
* The temporary directory.
*/
public void loadPartition(Path loadPath, String tableName,
- AbstractMap<String, String> partSpec, boolean replace, Path tmpDirPath)
+ Map<String, String> partSpec, boolean replace, Path tmpDirPath)
throws HiveException {
Table tbl = getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName);
try {
@@ -558,8 +558,8 @@ public class Hive {
// extrapolated from
// the table's location (even if the table is marked external)
fs = FileSystem.get(tbl.getDataLocation(), getConf());
- partPath = new Path(tbl.getDataLocation().getPath(), Warehouse
- .makePartName(partSpec));
+ partPath = new Path(tbl.getDataLocation().getPath(),
+ Warehouse.makePartName(partSpec));
} else {
// Partition exists already. Get the path from the partition. This will
// get the default path for Hive created partitions or the external path
@@ -588,6 +588,62 @@ public class Hive {
}
/**
+ * Given a source directory name of the load path, load all dynamically generated partitions
+ * into the specified table and return a list of strings that represent the dynamic partition
+ * paths.
+ * @param loadPath
+ * @param tableName
+ * @param partSpec
+ * @param replace
+ * @param tmpDirPath
+ * @param numSp: number of static partitions in the partition spec
+ * @return
+ * @throws HiveException
+ */
+ public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath,
+ String tableName, Map<String, String> partSpec, boolean replace,
+ Path tmpDirPath, int numDP)
+ throws HiveException {
+
+ try {
+ ArrayList<LinkedHashMap<String, String>> fullPartSpecs =
+ new ArrayList<LinkedHashMap<String, String>>();
+
+ FileSystem fs = loadPath.getFileSystem(conf);
+ FileStatus[] status = Utilities.getFileStatusRecurse(loadPath, numDP, fs);
+
+ if (status.length > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) {
+ throw new HiveException("Number of dynamic partitions created is " + status.length
+ + ", which is more than "
+ + conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)
+ +". To solve this try to set " + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+ + " to at least " + status.length + '.');
+ }
+
+ // for each dynamically created DP directory, construct a full partition spec
+ // and load the partition based on that
+ for (int i= 0; i < status.length; ++i) {
+ // get the dynamically created directory
+ Path partPath = status[i].getPath();
+ assert fs.getFileStatus(partPath).isDir():
+ "partitions " + partPath + " is not a directory !";
+
+ // generate a full partition specification
+ LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec);
+ Warehouse.makeSpecFromName(fullPartSpec, partPath);
+ fullPartSpecs.add(fullPartSpec);
+
+ // finally load the partition -- move the file to the final table address
+ loadPartition(partPath, tableName, fullPartSpec, replace, tmpDirPath);
+ LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
+ }
+ return fullPartSpecs;
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+ }
+
+ /**
* Load a directory into a Hive Table. - Alters existing content of table with
* the contents of loadPath. - If table does not exist - an exception is
* thrown - files in loadPath are moved into Hive. But the directory itself is
@@ -687,11 +743,14 @@ public class Hive {
List<String> pvals = new ArrayList<String>();
for (FieldSchema field : tbl.getPartCols()) {
String val = partSpec.get(field.getName());
- if (val == null || val.length() == 0) {
+ // enable dynamic partitioning
+ if (val == null && !HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)
+ || val.length() == 0) {
throw new HiveException("get partition: Value for key "
+ field.getName() + " is null or empty");
+ } else if (val != null){
+ pvals.add(val);
}
- pvals.add(val);
}
org.apache.hadoop.hive.metastore.api.Partition tpart = null;
try {
@@ -700,7 +759,6 @@ public class Hive {
LOG.debug("creating partition for table " + tbl.getTableName()
+ " with partition spec : " + partSpec);
tpart = getMSC().appendPartition(tbl.getDbName(), tbl.getTableName(), pvals);
- ;
}
if (tpart == null) {
return null;
@@ -845,8 +903,8 @@ public class Hive {
continue;
}
if (item.isDir()) {
- throw new HiveException("checkPaths: " + src.toString()
- + " has nested directory" + item.toString());
+ throw new HiveException("checkPaths: " + src.getPath()
+ + " has nested directory" + item.getPath());
}
Path tmpDest = new Path(destf, item.getPath().getName());
if (!replace && fs.exists(tmpDest)) {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Wed Apr 14 23:36:07 2010
@@ -21,12 +21,12 @@ package org.apache.hadoop.hive.ql.parse;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
-import java.util.Map;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -69,10 +69,10 @@ public abstract class BaseSemanticAnalyz
protected Context ctx;
protected HashMap<String, String> idToTableNameMap;
-
+
public static int HIVE_COLUMN_ORDER_ASC = 1;
public static int HIVE_COLUMN_ORDER_DESC = 0;
-
+
/**
* ReadEntitites that are passed to the hooks.
*/
@@ -336,7 +336,7 @@ public abstract class BaseSemanticAnalyz
protected List<FieldSchema> getColumns(ASTNode ast) throws SemanticException {
return getColumns(ast, true);
}
-
+
/**
* Get the list of FieldSchema out of the ASTNode.
*/
@@ -437,14 +437,16 @@ public abstract class BaseSemanticAnalyz
public static class tableSpec {
public String tableName;
public Table tableHandle;
- public HashMap<String, String> partSpec;
+ public Map<String, String> partSpec; // has to use LinkedHashMap to enforce order
public Partition partHandle;
+ public int numDynParts; // number of dynamic partition columns
public tableSpec(Hive db, HiveConf conf, ASTNode ast)
throws SemanticException {
assert (ast.getToken().getType() == HiveParser.TOK_TAB);
int childIndex = 0;
+ numDynParts = 0;
try {
// get table metadata
@@ -468,28 +470,63 @@ public abstract class BaseSemanticAnalyz
if (ast.getChildCount() == 2) {
childIndex = 1;
ASTNode partspec = (ASTNode) ast.getChild(1);
- partSpec = new LinkedHashMap<String, String>();
+ // partSpec is a mapping from partition column name to its value.
+ partSpec = new LinkedHashMap<String, String>(partspec.getChildCount());
for (int i = 0; i < partspec.getChildCount(); ++i) {
ASTNode partspec_val = (ASTNode) partspec.getChild(i);
- String val = stripQuotes(partspec_val.getChild(1).getText());
- partSpec.put(unescapeIdentifier(partspec_val.getChild(0).getText()
- .toLowerCase()), val);
+ String val = null;
+ if (partspec_val.getChildCount() < 2) { // DP in the form of T partition (ds, hr)
+ ++numDynParts;
+ } else { // in the form of T partition (ds="2010-03-03")
+ val = stripQuotes(partspec_val.getChild(1).getText());
+ }
+ partSpec.put(unescapeIdentifier(partspec_val.getChild(0).getText().toLowerCase()), val);
}
- try {
- // In case the partition already exists, we need to get the partition
- // data from the metastore
- partHandle = db.getPartition(tableHandle, partSpec, false);
- if (partHandle == null) {
- // this doesn't create partition. partition is created in MoveTask
- partHandle = new Partition(tableHandle, partSpec, null);
+ // check if the partition spec is valid
+ if (numDynParts > 0) {
+ List<FieldSchema> parts = tableHandle.getPartitionKeys();
+ int numStaPart = parts.size() - numDynParts;
+ if (numStaPart == 0 &&
+ conf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
+ throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg());
}
- } catch (HiveException e) {
- throw new SemanticException(ErrorMsg.INVALID_PARTITION.getMsg(ast
- .getChild(childIndex)));
+ for (FieldSchema fs: parts) {
+ if (partSpec.get(fs.getName().toLowerCase()) == null) {
+ if (numStaPart > 0) { // found a DP, but there exists ST as subpartition
+ throw new SemanticException(
+ ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg(ast.getChild(childIndex)));
+ }
+ break;
+ } else {
+ --numStaPart;
+ }
+ }
+ partHandle = null;
+ } else {
+ try {
+ // In case the partition already exists, we need to get the partition
+ // data from the metastore
+ partHandle = db.getPartition(tableHandle, partSpec, false);
+ if (partHandle == null) {
+ // this doesn't create partition. partition is created in MoveTask
+ partHandle = new Partition(tableHandle, partSpec, null);
+ }
+ } catch (HiveException e) {
+ throw new SemanticException(
+ ErrorMsg.INVALID_PARTITION.getMsg(ast.getChild(childIndex)));
+ }
}
}
}
+ public Map<String, String> getPartSpec() {
+ return this.partSpec;
+ }
+
+ public void setPartSpec(Map<String, String> partSpec) {
+ this.partSpec = partSpec;
+ }
+
@Override
public String toString() {
if (partHandle != null) {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java Wed Apr 14 23:36:07 2010
@@ -137,6 +137,15 @@ public enum ErrorMsg {
VIEW_COL_MISMATCH("The number of columns produced by the SELECT clause does not match the "
+ "number of column names specified by CREATE VIEW"),
DML_AGAINST_VIEW("A view cannot be used as target table for LOAD or INSERT"),
+ PARTITION_DYN_STA_ORDER("Dynamic partition cannot be the parent of a static partition"),
+ DYNAMIC_PARTITION_DISABLED("Dynamic partition is disabled. Either enable it by setting "
+ + "hive.exec.dynamic.partition=true or specify partition column values"),
+ DYNAMIC_PARTITION_STRICT_MODE("Dynamic partition strict mode requires at least one "
+ + "static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict"),
+ DYNAMIC_PARTITION_MERGE("Dynamic partition does not support merging mapfiles/mapredfiles yet."
+ + "Please set hive.merge.mapfiles and hive.merge.mapredfiles to false or use static "
+ + "partitions"),
+ NONEXISTPARTCOL("Partition column in the partition specification does not exist"),
UNSUPPORTED_TYPE("DATE, DATETIME, and TIMESTAMP types aren't supported yet. Please use "
+ "STRING instead."),
CREATE_NON_NATIVE_AS("CREATE TABLE AS SELECT cannot be used for a non-native table"),
@@ -168,7 +177,7 @@ public enum ErrorMsg {
* is not found or <code>ErrorMsg</code> has no <code>SQLState</code>, returns
* the <code>SQLState</code> bound to the <code>GENERIC_ERROR</code>
* <code>ErrorMsg</code>.
- *
+ *
* @param mesg
* An error message string
* @return SQLState
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g Wed Apr 14 23:36:07 2010
@@ -1391,7 +1391,7 @@ partitionSpec
partitionVal
:
- Identifier EQUAL constant -> ^(TOK_PARTVAL Identifier constant)
+ Identifier (EQUAL constant)? -> ^(TOK_PARTVAL Identifier constant?)
;
sysFuncNames
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java Wed Apr 14 23:36:07 2010
@@ -22,8 +22,9 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import org.antlr.runtime.tree.Tree;
import org.apache.commons.lang.StringUtils;
@@ -227,10 +228,12 @@ public class LoadSemanticAnalyzer extend
// create final load/move work
String loadTmpPath = ctx.getExternalTmpFileURI(toURI);
+ Map<String, String> partSpec = ts.getPartSpec();
+ if (partSpec == null) {
+ partSpec = new LinkedHashMap<String, String>();
+ }
LoadTableDesc loadTableWork = new LoadTableDesc(fromURI.toString(),
- loadTmpPath, Utilities.getTableDesc(ts.tableHandle),
- (ts.partSpec != null) ? ts.partSpec : new HashMap<String, String>(),
- isOverWrite);
+ loadTmpPath, Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);
if (rTask != null) {
rTask.addDependentTask(TaskFactory.get(new MoveWork(getInputs(),
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java Wed Apr 14 23:36:07 2010
@@ -19,15 +19,17 @@
package org.apache.hadoop.hive.ql.parse;
import java.util.HashMap;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
/**
* Implementation of the metadata information related to a query block.
- *
+ *
**/
public class QBMetaData {
@@ -44,6 +46,8 @@ public class QBMetaData {
private final HashMap<String, Partition> nameToDestPartition;
private final HashMap<String, String> nameToDestFile;
private final HashMap<String, Integer> nameToDestType;
+ private final HashMap<String, Map<String, String>> aliasToPartSpec;
+ private final HashMap<String, DynamicPartitionCtx> aliasToDPCtx;
@SuppressWarnings("unused")
private static final Log LOG = LogFactory.getLog(QBMetaData.class.getName());
@@ -54,6 +58,8 @@ public class QBMetaData {
nameToDestPartition = new HashMap<String, Partition>();
nameToDestFile = new HashMap<String, String>();
nameToDestType = new HashMap<String, Integer>();
+ aliasToPartSpec = new HashMap<String, Map<String, String>>();
+ aliasToDPCtx = new HashMap<String, DynamicPartitionCtx>();
}
// All getXXX needs toLowerCase() because they are directly called from
@@ -109,4 +115,21 @@ public class QBMetaData {
public Table getSrcForAlias(String alias) {
return aliasToTable.get(alias.toLowerCase());
}
+
+ public Map<String, String> getPartSpecForAlias(String alias) {
+ return aliasToPartSpec.get(alias);
+ }
+
+ public void setPartSpecForAlias(String alias, Map<String, String> partSpec) {
+ aliasToPartSpec.put(alias, partSpec);
+
+ }
+
+ public void setDPCtx(String alias, DynamicPartitionCtx dpCtx) {
+ aliasToDPCtx.put(alias, dpCtx);
+ }
+
+ public DynamicPartitionCtx getDPCtx(String alias) {
+ return aliasToDPCtx.get(alias);
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Apr 14 23:36:07 2010
@@ -31,9 +31,9 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
-import java.util.Map.Entry;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -88,6 +88,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1;
import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2;
import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3;
@@ -97,7 +98,6 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
import org.apache.hadoop.hive.ql.optimizer.Optimizer;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -107,6 +107,7 @@ import org.apache.hadoop.hive.ql.plan.Cr
import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc;
import org.apache.hadoop.hive.ql.plan.CreateViewDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -116,6 +117,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
import org.apache.hadoop.hive.ql.plan.ForwardDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
@@ -136,12 +138,11 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.UDTFDesc;
import org.apache.hadoop.hive.ql.plan.UnionDesc;
-import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
@@ -149,9 +150,9 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -779,15 +780,23 @@ public class SemanticAnalyzer extends Ba
.getMsg(ast, "The class is " + outputFormatClass.toString()));
}
- if (ts.partSpec == null) {
+ // tableSpec ts is got from the query (user specified),
+ // which means the user didn't specify partitions in their query,
+ // but whether the table itself is partitioned is not know.
+ if (ts.partHandle == null) {
// This is a table
qb.getMetaData().setDestForAlias(name, ts.tableHandle);
+ // has dynamic as well as static partitions
+ if (ts.partSpec != null && ts.partSpec.size() > 0) {
+ qb.getMetaData().setPartSpecForAlias(name, ts.partSpec);
+ }
} else {
// This is a partition
qb.getMetaData().setDestForAlias(name, ts.partHandle);
}
break;
}
+
case HiveParser.TOK_LOCAL_DIR:
case HiveParser.TOK_DIR: {
// This is a dfs file
@@ -3152,25 +3161,65 @@ public class SemanticAnalyzer extends Ba
int currentTableId = 0;
boolean isLocal = false;
SortBucketRSCtx rsCtx = new SortBucketRSCtx();
+ DynamicPartitionCtx dpCtx = null;
LoadTableDesc ltd = null;
switch (dest_type.intValue()) {
case QBMetaData.DEST_TABLE: {
dest_tab = qbm.getDestTableForAlias(dest);
+ Map<String, String> partSpec = qbm.getPartSpecForAlias(dest);
+ dest_path = dest_tab.getPath();
// check for partition
List<FieldSchema> parts = dest_tab.getPartitionKeys();
- if (parts != null && parts.size() > 0) {
- throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
+ if (parts != null && parts.size() > 0) { // table is partitioned
+ if (partSpec== null || partSpec.size() == 0) { // user did NOT specify partition
+ throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
+ }
+ dpCtx = qbm.getDPCtx(dest);
+ if (dpCtx == null) {
+ validatePartSpec(dest_tab, partSpec);
+ dpCtx = new DynamicPartitionCtx(dest_tab, partSpec,
+ conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
+ conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
+ qbm.setDPCtx(dest, dpCtx);
+ }
+
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP
+ // TODO: we should support merge files for dynamically generated partitions later
+ if (dpCtx.getNumDPCols() > 0 &&
+ (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPFILES) ||
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES))) {
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPFILES, false);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false);
+ }
+ // turn on hive.task.progress to update # of partitions created to the JT
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS, true);
+
+ } else { // QBMetaData.DEST_PARTITION capture the all-SP case
+ throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg());
+ }
+ if (dpCtx.getSPPath() != null) {
+ dest_path = new Path(dest_tab.getPath(), dpCtx.getSPPath());
+ }
+ if ((dest_tab.getNumBuckets() > 0) &&
+ (conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING))) {
+ dpCtx.setNumBuckets(dest_tab.getNumBuckets());
+ }
}
- dest_path = dest_tab.getPath();
+
boolean isNonNativeTable = dest_tab.isNonNative();
if (isNonNativeTable) {
queryTmpdir = dest_path.toUri().getPath();
} else {
queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri());
}
+ if (dpCtx != null) {
+ // set the root of the temporay path where dynamic partition columns will populate
+ dpCtx.setRootPath(queryTmpdir);
+ }
+ // this table_desc does not contain the partitioning columns
table_desc = Utilities.getTableDesc(dest_tab);
// Add sorting/bucketing if needed
@@ -3181,13 +3230,18 @@ public class SemanticAnalyzer extends Ba
destTableId++;
// Create the work for moving the table
+ // NOTE: specify Dynamic partitions in dest_tab for WriteEntity
if (!isNonNativeTable) {
- ltd = new LoadTableDesc(queryTmpdir, ctx
- .getExternalTmpFileURI(dest_path.toUri()), table_desc,
- new HashMap<String, String>());
+ ltd = new LoadTableDesc(queryTmpdir, ctx.getExternalTmpFileURI(dest_path.toUri()),
+ table_desc, dpCtx);
loadTableWork.add(ltd);
}
- if (!outputs.add(new WriteEntity(dest_tab))) {
+
+ // Here only register the whole table for post-exec hook if no DP present
+ // in the case of DP, we will register WriteEntity in MoveTask when the
+ // list of dynamically created partitions are known.
+ if ((dpCtx == null || dpCtx.getNumDPCols() == 0) &&
+ !outputs.add(new WriteEntity(dest_tab))) {
throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
.getMsg(dest_tab.getTableName()));
}
@@ -3333,7 +3387,7 @@ public class SemanticAnalyzer extends Ba
throw new SemanticException("Unknown destination type: " + dest_type);
}
- input = genConversionSelectOperator(dest, qb, input, table_desc);
+ input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx);
inputRR = opParseCtx.get(input).getRR();
ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();
@@ -3354,11 +3408,19 @@ public class SemanticAnalyzer extends Ba
RowSchema fsRS = new RowSchema(vecCol);
- Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new FileSinkDesc(queryTmpdir, table_desc, conf
- .getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId,
- rsCtx.isMultiFileSpray(), rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols()),
- fsRS, input), inputRR);
+ Operator output = putOpInsertMap(
+ OperatorFactory.getAndMakeChild(
+ new FileSinkDesc(
+ queryTmpdir,
+ table_desc,
+ conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT),
+ currentTableId,
+ rsCtx.isMultiFileSpray(),
+ rsCtx.getNumFiles(),
+ rsCtx.getTotalFiles(),
+ rsCtx.getPartnCols(),
+ dpCtx),
+ fsRS, input), inputRR);
if (ltd != null && SessionState.get() != null) {
@@ -3372,12 +3434,26 @@ public class SemanticAnalyzer extends Ba
return output;
}
+ private void validatePartSpec(Table tbl, Map<String, String> partSpec)
+ throws SemanticException {
+ List<FieldSchema> parts = tbl.getPartitionKeys();
+ Set<String> partCols = new HashSet<String>(parts.size());
+ for (FieldSchema col: parts) {
+ partCols.add(col.getName());
+ }
+ for (String col: partSpec.keySet()) {
+ if (!partCols.contains(col)) {
+ throw new SemanticException(ErrorMsg.NONEXISTPARTCOL.getMsg());
+ }
+ }
+ }
+
/**
* Generate the conversion SelectOperator that converts the columns into the
* types that are expected by the table_desc.
*/
Operator genConversionSelectOperator(String dest, QB qb, Operator input,
- TableDesc table_desc) throws SemanticException {
+ TableDesc table_desc, DynamicPartitionCtx dpCtx) throws SemanticException {
StructObjectInspector oi = null;
try {
Deserializer deserializer = table_desc.getDeserializerClass()
@@ -3390,13 +3466,20 @@ public class SemanticAnalyzer extends Ba
// Check column number
List<? extends StructField> tableFields = oi.getAllStructFieldRefs();
+ boolean dynPart = HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING);
ArrayList<ColumnInfo> rowFields = opParseCtx.get(input).getRR()
.getColumnInfos();
if (tableFields.size() != rowFields.size()) {
- String reason = "Table " + dest + " has " + tableFields.size()
- + " columns but query has " + rowFields.size() + " columns.";
- throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
- qb.getParseInfo().getDestForClause(dest), reason));
+ if (!dynPart || dpCtx == null ||
+ tableFields.size() + dpCtx.getNumDPCols() != rowFields.size()) {
+ String reason = "Table " + dest + " has " + tableFields.size()
+ + " columns but query has " + rowFields.size() + " columns.";
+ throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
+ qb.getParseInfo().getDestForClause(dest), reason));
+ } else {
+ // create the mapping from input ExprNode to dest table DP column
+ dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size()));
+ }
}
// Check column types
@@ -3405,13 +3488,14 @@ public class SemanticAnalyzer extends Ba
ArrayList<ExprNodeDesc> expressions = new ArrayList<ExprNodeDesc>(
columnNumber);
// MetadataTypedColumnsetSerDe does not need type conversions because it
- // does
- // the conversion to String by itself.
+ // does the conversion to String by itself.
boolean isMetaDataSerDe = table_desc.getDeserializerClass().equals(
MetadataTypedColumnsetSerDe.class);
boolean isLazySimpleSerDe = table_desc.getDeserializerClass().equals(
LazySimpleSerDe.class);
if (!isMetaDataSerDe) {
+
+ // here only deals with non-partition columns. We deal with partition columns next
for (int i = 0; i < columnNumber; i++) {
ObjectInspector tableFieldOI = tableFields.get(i)
.getFieldObjectInspector();
@@ -3447,6 +3531,18 @@ public class SemanticAnalyzer extends Ba
}
}
+ // deal with dynamic partition columns: convert ExprNodeDesc type to String??
+ if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) {
+ // DP columns starts with tableFields.size()
+ for (int i = tableFields.size(); i < rowFields.size(); ++i ) {
+ TypeInfo rowFieldTypeInfo = rowFields.get(i).getType();
+ ExprNodeDesc column = new ExprNodeColumnDesc(
+ rowFieldTypeInfo, rowFields.get(i).getInternalName(), "", false);
+ expressions.add(column);
+ }
+ // converted = true; // [TODO]: should we check & convert type to String and set it to true?
+ }
+
if (converted) {
// add the select operator
RowResolver rowResolver = new RowResolver();
@@ -6321,9 +6417,14 @@ public class SemanticAnalyzer extends Ba
if (cols.size() != 0) {
throw new SemanticException(ErrorMsg.CTAS_COLLST_COEXISTENCE.getMsg());
}
- // TODO: support partition for CTAS?
if (partCols.size() != 0 || bucketCols.size() != 0) {
- throw new SemanticException(ErrorMsg.CTAS_PARCOL_COEXISTENCE.getMsg());
+ boolean dynPart = HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING);
+ if (dynPart == false) {
+ throw new SemanticException(ErrorMsg.CTAS_PARCOL_COEXISTENCE.getMsg());
+ } else {
+ // TODO: support dynamic partition for CTAS
+ throw new SemanticException(ErrorMsg.CTAS_PARCOL_COEXISTENCE.getMsg());
+ }
}
if (isExt) {
throw new SemanticException(ErrorMsg.CTAS_EXTTBL_COEXISTENCE.getMsg());
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java Wed Apr 14 23:36:07 2010
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+public class DynamicPartitionCtx implements Serializable {
+
+ /**
+ * default serialization ID
+ */
+ private static final long serialVersionUID = 1L;
+
+ private Map<String, String> partSpec; // partSpec is an ORDERED hash map
+ private int numDPCols; // number of dynamic partition columns
+ private int numSPCols; // number of static partition columns
+ private String spPath; // path name corresponding to SP columns
+ private String rootPath; // the root path DP columns paths start from
+ private int numBuckets; // number of buckets in each partition
+
+ private Map<String, String> inputToDPCols; // mapping from input column names to DP columns
+
+ private List<String> spNames; // sp column names
+ private List<String> dpNames; // dp column names
+ private String defaultPartName; // default partition name in case of null or empty value
+ private int maxPartsPerNode; // maximum dynamic partitions created per mapper/reducer
+
+ public DynamicPartitionCtx() {
+ }
+
+ public DynamicPartitionCtx(Table tbl, Map<String, String> partSpec, String defaultPartName,
+ int maxParts) {
+ this.partSpec = partSpec;
+ this.spNames = new ArrayList<String>();
+ this.dpNames = new ArrayList<String>();
+ this.numBuckets = 0;
+ this.maxPartsPerNode = maxParts;
+ this.defaultPartName = defaultPartName;
+
+ for (Map.Entry<String, String> me: partSpec.entrySet()) {
+ if (me.getValue() == null) {
+ dpNames.add(me.getKey());
+ } else {
+ spNames.add(me.getKey());
+ }
+ }
+ this.numDPCols = dpNames.size();
+ this.numSPCols = spNames.size();
+ this.inputToDPCols = new HashMap<String, String>();
+ if (this.numSPCols > 0) {
+ this.spPath = Warehouse.makeDynamicPartName(partSpec);
+ } else {
+ this.spPath = null;
+ }
+ }
+
+ public void mapInputToDP(List<ColumnInfo> fs) {
+
+ assert fs.size() == this.numDPCols: "input DP column size != numDPCols";
+
+ Iterator<ColumnInfo> itr1 = fs.iterator();
+ Iterator<String> itr2 = dpNames.iterator();
+
+ while (itr1.hasNext() && itr2.hasNext()) {
+ inputToDPCols.put(itr1.next().getInternalName(), itr2.next());
+ }
+ }
+
+ public int getMaxPartitionsPerNode() {
+ return this.maxPartsPerNode;
+ }
+
+ public void setMaxPartitionsPerNode(int maxParts) {
+ this.maxPartsPerNode = maxParts;
+ }
+
+ public String getDefaultPartitionName() {
+ return this.defaultPartName;
+ }
+
+ public void setDefaultPartitionName(String pname) {
+ this.defaultPartName = pname;
+ }
+
+ public void setNumBuckets(int bk) {
+ this.numBuckets = bk;
+ }
+
+ public int getNumBuckets() {
+ return this.numBuckets;
+ }
+
+ public void setRootPath(String root) {
+ this.rootPath = root;
+ }
+
+ public String getRootPath() {
+ return this.rootPath;
+ }
+
+ public List<String> getDPColNames() {
+ return this.dpNames;
+ }
+
+ public void setDPColNames(List<String> dp) {
+ this.dpNames = dp;
+ }
+
+ public List<String> getSPColNames() {
+ return this.spNames;
+ }
+
+ public void setPartSpec(Map<String, String> ps) {
+ this.partSpec = ps;
+ }
+
+ public Map<String, String> getPartSpec() {
+ return this.partSpec;
+ }
+
+ public void setSPColNames(List<String> sp) {
+ this.spNames = sp;
+ }
+
+ public Map<String, String> getInputToDPCols() {
+ return this.inputToDPCols;
+ }
+
+ public void setInputToDPCols(Map<String, String> map) {
+ this.inputToDPCols = map;
+ }
+
+ public void setNumDPCols(int dp) {
+ this.numDPCols = dp;
+ }
+
+ public int getNumDPCols() {
+ return this.numDPCols;
+ }
+
+ public void setNumSPCols(int sp) {
+ this.numSPCols = sp;
+ }
+
+ public int getNumSPCols() {
+ return this.numSPCols;
+ }
+
+ public void setSPPath(String sp) {
+ this.spPath = sp;
+ }
+
+ public String getSPPath() {
+ return this.spPath;
+ }
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java Wed Apr 14 23:36:07 2010
@@ -38,13 +38,15 @@ public class FileSinkDesc implements Ser
private int totalFiles;
private ArrayList<ExprNodeDesc> partitionCols;
private int numFiles;
+ private DynamicPartitionCtx dpCtx;
public FileSinkDesc() {
}
public FileSinkDesc(final String dirName, final TableDesc tableInfo,
final boolean compressed, final int destTableId, final boolean multiFileSpray,
- final int numFiles, final int totalFiles, final ArrayList<ExprNodeDesc> partitionCols) {
+ final int numFiles, final int totalFiles, final ArrayList<ExprNodeDesc> partitionCols,
+ final DynamicPartitionCtx dpCtx) {
this.dirName = dirName;
this.tableInfo = tableInfo;
@@ -54,6 +56,7 @@ public class FileSinkDesc implements Ser
this.numFiles = numFiles;
this.totalFiles = totalFiles;
this.partitionCols = partitionCols;
+ this.dpCtx = dpCtx;
}
public FileSinkDesc(final String dirName, final TableDesc tableInfo,
@@ -179,4 +182,12 @@ public class FileSinkDesc implements Ser
public void setNumFiles(int numFiles) {
this.numFiles = numFiles;
}
+
+ public void setDynPartCtx(DynamicPartitionCtx dpc) {
+ this.dpCtx = dpc;
+ }
+
+ public DynamicPartitionCtx getDynPartCtx() {
+ return this.dpCtx;
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java Wed Apr 14 23:36:07 2010
@@ -26,7 +26,7 @@ import java.util.Map;
/**
* Join operator Descriptor implementation.
- *
+ *
*/
@Explain(displayName = "Join Operator")
public class JoinDesc implements Serializable {
@@ -49,7 +49,7 @@ public class JoinDesc implements Seriali
private Map<Byte, List<ExprNodeDesc>> exprs;
// used for create joinOutputObjectInspector
- protected java.util.ArrayList<java.lang.String> outputColumnNames;
+ protected List<String> outputColumnNames;
// key:column output name, value:tag
private transient Map<String, Byte> reversedExprs;
@@ -66,7 +66,7 @@ public class JoinDesc implements Seriali
}
public JoinDesc(final Map<Byte, List<ExprNodeDesc>> exprs,
- ArrayList<String> outputColumnNames, final boolean noOuterJoin,
+ List<String> outputColumnNames, final boolean noOuterJoin,
final JoinCondDesc[] conds) {
this.exprs = exprs;
this.outputColumnNames = outputColumnNames;
@@ -80,12 +80,12 @@ public class JoinDesc implements Seriali
}
public JoinDesc(final Map<Byte, List<ExprNodeDesc>> exprs,
- ArrayList<String> outputColumnNames) {
+ List<String> outputColumnNames) {
this(exprs, outputColumnNames, true, null);
}
public JoinDesc(final Map<Byte, List<ExprNodeDesc>> exprs,
- ArrayList<String> outputColumnNames, final JoinCondDesc[] conds) {
+ List<String> outputColumnNames, final JoinCondDesc[] conds) {
this(exprs, outputColumnNames, false, conds);
}
@@ -150,12 +150,12 @@ public class JoinDesc implements Seriali
}
@Explain(displayName = "outputColumnNames")
- public java.util.ArrayList<java.lang.String> getOutputColumnNames() {
+ public List<String> getOutputColumnNames() {
return outputColumnNames;
}
public void setOutputColumnNames(
- java.util.ArrayList<java.lang.String> outputColumnNames) {
+ List<String> outputColumnNames) {
this.outputColumnNames = outputColumnNames;
}
@@ -191,7 +191,7 @@ public class JoinDesc implements Seriali
/**
* The order in which tables should be processed when joining.
- *
+ *
* @return Array of tags
*/
public Byte[] getTagOrder() {
@@ -200,7 +200,7 @@ public class JoinDesc implements Seriali
/**
* The order in which tables should be processed when joining.
- *
+ *
* @param tagOrder
* Array of tags
*/
@@ -215,7 +215,7 @@ public class JoinDesc implements Seriali
/**
* set to handle skew join in this join op.
- *
+ *
* @param handleSkewJoin
*/
public void setHandleSkewJoin(boolean handleSkewJoin) {
@@ -231,7 +231,7 @@ public class JoinDesc implements Seriali
/**
* set the mapping from tbl to dir for big keys.
- *
+ *
* @param bigKeysDirMap
*/
public void setBigKeysDirMap(Map<Byte, String> bigKeysDirMap) {
@@ -247,7 +247,7 @@ public class JoinDesc implements Seriali
/**
* set the mapping from tbl to dir for small keys.
- *
+ *
* @param bigKeysDirMap
*/
public void setSmallKeysDirMap(Map<Byte, Map<Byte, String>> smallKeysDirMap) {
@@ -264,7 +264,7 @@ public class JoinDesc implements Seriali
/**
* set skew key definition.
- *
+ *
* @param skewKeyDefinition
*/
public void setSkewKeyDefinition(int skewKeyDefinition) {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java Wed Apr 14 23:36:07 2010
@@ -19,7 +19,8 @@
package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
/**
* LoadTableDesc.
@@ -30,31 +31,49 @@ public class LoadTableDesc extends org.a
private static final long serialVersionUID = 1L;
private boolean replace;
private String tmpDir;
+ private DynamicPartitionCtx dpCtx;
// TODO: the below seems like they should just be combined into partitionDesc
private org.apache.hadoop.hive.ql.plan.TableDesc table;
- private HashMap<String, String> partitionSpec;
+ private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map
public LoadTableDesc() {
}
public LoadTableDesc(final String sourceDir, final String tmpDir,
final org.apache.hadoop.hive.ql.plan.TableDesc table,
- final HashMap<String, String> partitionSpec, final boolean replace) {
-
+ final Map<String, String> partitionSpec, final boolean replace) {
super(sourceDir);
- this.tmpDir = tmpDir;
- this.table = table;
- this.partitionSpec = partitionSpec;
- this.replace = replace;
+ init(sourceDir, tmpDir, table, partitionSpec, replace);
}
public LoadTableDesc(final String sourceDir, final String tmpDir,
final org.apache.hadoop.hive.ql.plan.TableDesc table,
- final HashMap<String, String> partitionSpec) {
+ final Map<String, String> partitionSpec) {
this(sourceDir, tmpDir, table, partitionSpec, true);
}
+ public LoadTableDesc(final String sourceDir, final String tmpDir,
+ final org.apache.hadoop.hive.ql.plan.TableDesc table,
+ final DynamicPartitionCtx dpCtx) {
+ super(sourceDir);
+ this.dpCtx = dpCtx;
+ if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) {
+ init(sourceDir, tmpDir, table, dpCtx.getPartSpec(), true);
+ } else {
+ init(sourceDir, tmpDir, table, new LinkedHashMap<String, String>(), true);
+ }
+ }
+
+ private void init(final String sourceDir, final String tmpDir,
+ final org.apache.hadoop.hive.ql.plan.TableDesc table,
+ final Map<String, String> partitionSpec, final boolean replace) {
+ this.tmpDir = tmpDir;
+ this.table = table;
+ this.partitionSpec = partitionSpec;
+ this.replace = replace;
+ }
+
@Explain(displayName = "tmp directory", normalExplain = false)
public String getTmpDir() {
return tmpDir;
@@ -74,11 +93,11 @@ public class LoadTableDesc extends org.a
}
@Explain(displayName = "partition")
- public HashMap<String, String> getPartitionSpec() {
+ public Map<String, String> getPartitionSpec() {
return partitionSpec;
}
- public void setPartitionSpec(final HashMap<String, String> partitionSpec) {
+ public void setPartitionSpec(final Map<String, String> partitionSpec) {
this.partitionSpec = partitionSpec;
}
@@ -90,4 +109,12 @@ public class LoadTableDesc extends org.a
public void setReplace(boolean replace) {
this.replace = replace;
}
+
+ public DynamicPartitionCtx getDPCtx() {
+ return dpCtx;
+ }
+
+ public void setDPCtx(final DynamicPartitionCtx dpCtx) {
+ this.dpCtx = dpCtx;
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java Wed Apr 14 23:36:07 2010
@@ -25,8 +25,8 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
+import java.util.Set;
/**
* Map Join operator Descriptor implementation.
@@ -67,7 +67,7 @@ public class MapJoinDesc extends JoinDes
public MapJoinDesc(final Map<Byte, List<ExprNodeDesc>> keys,
final TableDesc keyTblDesc, final Map<Byte, List<ExprNodeDesc>> values,
- final List<TableDesc> valueTblDescs, ArrayList<String> outputColumnNames,
+ final List<TableDesc> valueTblDescs, List<String> outputColumnNames,
final int posBigTable, final JoinCondDesc[] conds) {
super(values, outputColumnNames, conds);
this.keys = keys;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java Wed Apr 14 23:36:07 2010
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -37,6 +38,7 @@ public class MoveWork implements Seriali
private LoadFileDesc loadFileWork;
private boolean checkFileFormat;
+ ArrayList<String> dpSpecPaths; // dynamic partition specified paths -- the root of DP columns
/**
* ReadEntitites that are passed to the hooks.
@@ -69,6 +71,14 @@ public class MoveWork implements Seriali
this.checkFileFormat = checkFileFormat;
}
+ public void setDPSpecPaths(ArrayList<String> dpsp) {
+ dpSpecPaths = dpsp;
+ }
+
+ public ArrayList<String> getDPSpecPaths() {
+ return dpSpecPaths;
+ }
+
@Explain(displayName = "tables")
public LoadTableDesc getLoadTableWork() {
return loadTableWork;
Added: hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part1.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part1.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part1.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part1.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,11 @@
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nostrict;
+set hive.exec.max.dynamic.partitions=2;
+
+drop table dynamic_partition;
+create table dynamic_partition (key string) partitioned by (value string);
+
+insert overwrite table dynamic_partition partition(hr) select key, value from src;
+
+drop table dynamic_partition;
+
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part1.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part1.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part1.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part1.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,30 @@
+show partitions srcpart;
+
+drop table nzhang_part1;
+drop table nzhang_part2;
+
+create table if not exists nzhang_part1 like srcpart;
+create table if not exists nzhang_part2 like srcpart;
+describe extended nzhang_part1;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.exec.dynamic.partition=true;
+
+explain
+from srcpart
+insert overwrite table nzhang_part1 partition (ds, hr) select key, value, ds, hr where ds <= '2008-04-08'
+insert overwrite table nzhang_part2 partition(ds='2008-12-31', hr) select key, value, hr where ds > '2008-04-08';
+
+from srcpart
+insert overwrite table nzhang_part1 partition (ds, hr) select key, value, ds, hr where ds <= '2008-04-08'
+insert overwrite table nzhang_part2 partition(ds='2008-12-31', hr) select key, value, hr where ds > '2008-04-08';
+
+
+show partitions nzhang_part1;
+show partitions nzhang_part2;
+
+select * from nzhang_part1 where ds is not null and hr is not null;
+select * from nzhang_part2 where ds is not null and hr is not null;
+
+drop table nzhang_part1;
+drop table nzhang_part2;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part10.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part10.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part10.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part10.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,24 @@
+show partitions srcpart;
+
+drop table nzhang_part10;
+
+create table if not exists nzhang_part10 like srcpart;
+describe extended nzhang_part10;
+
+set hive.merge.mapfiles=false;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+explain
+from srcpart
+insert overwrite table nzhang_part10 partition(ds='2008-12-31', hr) select key, value, hr where ds > '2008-04-08';
+
+from srcpart
+insert overwrite table nzhang_part10 partition(ds='2008-12-31', hr) select key, value, hr where ds > '2008-04-08';
+
+
+show partitions nzhang_part10;
+
+select * from nzhang_part10 where ds is not null and hr is not null;
+
+drop table nzhang_part10;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part11.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part11.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part11.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part11.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,17 @@
+show partitions srcpart;
+
+drop table nzhang_part;
+create table if not exists nzhang_part like srcpart;
+describe extended nzhang_part;
+
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+set hive.exec.compress.output=true;
+set hive.exec.dynamic.partition=true;
+
+insert overwrite table nzhang_part partition (ds="2010-03-03", hr) select key, value, hr from srcpart where ds is not null and hr is not null;
+
+select * from nzhang_part where ds = '2010-03-03' and hr = '11';
+select * from nzhang_part where ds = '2010-03-03' and hr = '12';
+
+drop table nzhang_part;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part12.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part12.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part12.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part12.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,19 @@
+show partitions srcpart;
+
+drop table nzhang_part12;
+
+create table if not exists nzhang_part12 like srcpart;
+describe extended nzhang_part12;
+
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+set hive.exec.dynamic.partition=true;
+
+
+insert overwrite table nzhang_part12 partition (ds="2010-03-03", hr) select key, value, cast(hr*2 as int) from srcpart where ds is not null and hr is not null;
+
+show partitions nzhang_part12;
+
+select * from nzhang_part12 where ds is not null and hr is not null;
+
+drop table nzhang_part12;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part13.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part13.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part13.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part13.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,37 @@
+show partitions srcpart;
+
+drop table nzhang_part13;
+
+create table if not exists nzhang_part13 like srcpart;
+describe extended nzhang_part13;
+
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+set hive.exec.dynamic.partition=true;
+
+explain
+insert overwrite table nzhang_part13 partition (ds="2010-03-03", hr)
+select * from (
+ select key, value, '22'
+ from src
+ where key < 20
+ union all
+ select key, value, '33'
+ from src
+ where key > 20 and key < 40) s;
+
+insert overwrite table nzhang_part13 partition (ds="2010-03-03", hr)
+select * from (
+ select key, value, '22'
+ from src
+ where key < 20
+ union all
+ select key, value, '33'
+ from src
+ where key > 20 and key < 40) s;
+
+show partitions nzhang_part13;
+
+select * from nzhang_part13 where ds is not null and hr is not null;
+
+drop table nzhang_part13;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part2.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part2.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part2.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part2.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,23 @@
+drop table nzhang_part_bucket;
+create table if not exists nzhang_part_bucket (key string, value string)
+ partitioned by (ds string, hr string)
+ clustered by (key) into 10 buckets;
+
+describe extended nzhang_part_bucket;
+
+set hive.merge.mapfiles=false;
+set hive.enforce.bucketing=true;
+set hive.exec.dynamic.partition=true;
+
+explain
+insert overwrite table nzhang_part_bucket partition (ds='2010-03-23', hr) select key, value, hr from srcpart where ds is not null and hr is not null;
+
+insert overwrite table nzhang_part_bucket partition (ds='2010-03-23', hr) select key, value, hr from srcpart where ds is not null and hr is not null;
+
+show partitions nzhang_part_bucket;
+
+select * from nzhang_part_bucket where ds='2010-03-23' and hr='11' order by key;
+select * from nzhang_part_bucket where ds='2010-03-23' and hr='12' order by key;
+
+drop table nzhang_part_bucket;
+
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part3.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part3.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part3.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part3.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,19 @@
+show partitions srcpart;
+
+drop table nzhang_part3;
+
+create table if not exists nzhang_part3 like srcpart;
+describe extended nzhang_part3;
+
+set hive.merge.mapfiles=false;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.exec.dynamic.partition=true;
+
+explain
+insert overwrite table nzhang_part3 partition (ds, hr) select key, value, ds, hr from srcpart where ds is not null and hr is not null;
+
+insert overwrite table nzhang_part3 partition (ds, hr) select key, value, ds, hr from srcpart where ds is not null and hr is not null;
+
+select * from nzhang_part3 where ds is not null and hr is not null;
+
+drop table nzhang_part3;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part4.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part4.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part4.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part4.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,24 @@
+show partitions srcpart;
+
+drop table nzhang_part4;
+
+create table if not exists nzhang_part4 like srcpart;
+describe extended nzhang_part4;
+
+set hive.merge.mapfiles=false;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+insert overwrite table nzhang_part4 partition (ds='2008-04-08', hr='existing_value') select key, value from src;
+
+explain
+insert overwrite table nzhang_part4 partition (ds, hr) select key, value, ds, hr from srcpart where ds is not null and hr is not null;
+
+insert overwrite table nzhang_part4 partition (ds, hr) select key, value, ds, hr from srcpart where ds is not null and hr is not null;
+
+show partitions nzhang_part4;
+select * from nzhang_part4 where ds='2008-04-08' and hr is not null;
+
+select * from nzhang_part4 where ds is not null and hr is not null;
+
+drop table nzhang_part4;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part5.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part5.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part5.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part5.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,22 @@
+drop table nzhang_part5;
+
+create table if not exists nzhang_part5 (key string) partitioned by (value string);
+describe extended nzhang_part5;
+
+set hive.merge.mapfiles=false;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.exec.max.dynamic.partitions=2000;
+set hive.exec.max.dynamic.partitions.pernode=2000;
+
+explain
+insert overwrite table nzhang_part5 partition (value) select key, value from src;
+
+insert overwrite table nzhang_part5 partition (value) select key, value from src;
+
+show partitions nzhang_part5;
+
+select * from nzhang_part5 where value='val_0';
+select * from nzhang_part5 where value='val_2';
+
+drop table nzhang_part5;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part6.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part6.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part6.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part6.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,16 @@
+show partitions srcpart;
+
+drop table nzhang_part6;
+
+create table if not exists nzhang_part6 like srcpart;
+describe extended nzhang_part6;
+
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+set hive.exec.dynamic.partition=true;
+
+insert overwrite table nzhang_part6 partition (ds="2010-03-03", hr) select key, value, hr from srcpart where ds is not null and hr is not null;
+
+select * from nzhang_part6 where ds = '2010-03-03' and hr = '11';
+select * from nzhang_part6 where ds = '2010-03-03' and hr = '12';
+drop table nzhang_part6;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part7.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part7.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part7.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part7.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,14 @@
+show partitions srcpart;
+
+drop table nzhang_part7;
+
+create table if not exists nzhang_part7 like srcpart;
+describe extended nzhang_part7;
+
+
+insert overwrite table nzhang_part7 partition (ds='2010-03-03', hr='12') select key, value from srcpart where ds = '2008-04-08' and hr = '12';
+
+show partitions nzhang_part7;
+
+select * from nzhang_part7 where ds is not null and hr is not null;
+drop table nzhang_part7;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part8.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part8.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part8.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part8.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,24 @@
+show partitions srcpart;
+
+drop table nzhang_part8;
+
+create table if not exists nzhang_part8 like srcpart;
+describe extended nzhang_part8;
+
+set hive.merge.mapfiles=false;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+explain extended
+from srcpart
+insert overwrite table nzhang_part8 partition (ds, hr) select key, value, ds, hr where ds <= '2008-04-08'
+insert overwrite table nzhang_part8 partition(ds='2008-12-31', hr) select key, value, hr where ds > '2008-04-08';
+
+from srcpart
+insert overwrite table nzhang_part8 partition (ds, hr) select key, value, ds, hr where ds <= '2008-04-08'
+insert overwrite table nzhang_part8 partition(ds='2008-12-31', hr) select key, value, hr where ds > '2008-04-08';
+
+show partitions nzhang_part8;
+
+select * from nzhang_part8 where ds is not null and hr is not null;
+drop table nzhang_part8;
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part9.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part9.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part9.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part9.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,23 @@
+show partitions srcpart;
+
+drop table nzhang_part9;
+
+create table if not exists nzhang_part9 like srcpart;
+describe extended nzhang_part9;
+
+set hive.merge.mapfiles=false;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+explain
+from srcpart
+insert overwrite table nzhang_part9 partition (ds, hr) select key, value, ds, hr where ds <= '2008-04-08';
+
+from srcpart
+insert overwrite table nzhang_part9 partition (ds, hr) select key, value, ds, hr where ds <= '2008-04-08';
+
+
+show partitions nzhang_part9;
+
+select * from nzhang_part9 where ds is not null and hr is not null;
+drop table nzhang_part9;
Added: hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part1.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part1.q.out?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part1.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part1.q.out Wed Apr 14 23:36:07 2010
@@ -0,0 +1,10 @@
+PREHOOK: query: drop table dynamic_partition
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table dynamic_partition
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table dynamic_partition (key string) partitioned by (value string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table dynamic_partition (key string) partitioned by (value string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@dynamic_partition
+FAILED: Error in semantic analysis: Partition column in the partition specification does not exist