You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2012/12/05 12:59:26 UTC
svn commit: r1417374 [1/11] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/ha...
Author: namit
Date: Wed Dec 5 11:59:15 2012
New Revision: 1417374
URL: http://svn.apache.org/viewvc?rev=1417374&view=rev
Log:
HIVE-3073 Hive List Bucketing - DML support
(Gang Tim Liu via namit)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ListBucketingCtx.java
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_1.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_2.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_3.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_4.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_5.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_6.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_7.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_8.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_9.q
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_1.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_2.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_3.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_5.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_8.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPrunerUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
hive/trunk/ql/src/test/queries/clientnegative/column_change_skewedcol_type1.q
hive/trunk/ql/src/test/queries/clientnegative/column_rename5.q
hive/trunk/ql/src/test/queries/clientnegative/create_skewed_table_col_name_value_no_mismatch.q
hive/trunk/ql/src/test/queries/clientnegative/create_skewed_table_dup_col_name.q
hive/trunk/ql/src/test/queries/clientnegative/create_skewed_table_failure_invalid_col_name.q
hive/trunk/ql/src/test/queries/clientnegative/invalid_config1.q
hive/trunk/ql/src/test/queries/clientnegative/load_stored_as_dirs.q
hive/trunk/ql/src/test/queries/clientpositive/alter_skewed_table.q
hive/trunk/ql/src/test/queries/clientpositive/create_alter_list_bucketing_table1.q
hive/trunk/ql/src/test/queries/clientpositive/create_skewed_table1.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_query_multiskew_1.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_query_multiskew_2.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_query_multiskew_3.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_query_oneskew_1.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_query_oneskew_2.q
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_query_oneskew_3.q
hive/trunk/ql/src/test/results/clientnegative/invalid_config1.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_multiskew_1.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_multiskew_2.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_multiskew_3.q.out
hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java Wed Dec 5 11:59:15 2012
@@ -22,12 +22,11 @@ import java.io.IOException;
import java.net.URI;
import java.util.BitSet;
import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-
import org.apache.hadoop.util.Shell;
@@ -121,6 +120,47 @@ public final class FileUtils {
return name.toString();
}
+ /**
+ * default directory will have the same depth as number of skewed columns
+ * this will make future operation easy like DML merge, concatenate merge
+ * @param skewedCols
+ * @param name
+ * @return
+ */
+ public static String makeDefaultListBucketingDirName(List<String> skewedCols,
+ String name) {
+ String lbDirName;
+ String defaultDir = FileUtils.escapePathName(name);
+ StringBuilder defaultDirPath = new StringBuilder();
+ for (int i = 0; i < skewedCols.size(); i++) {
+ if (i > 0) {
+ defaultDirPath.append(Path.SEPARATOR);
+ }
+ defaultDirPath.append(defaultDir);
+ }
+ lbDirName = defaultDirPath.toString();
+ return lbDirName;
+ }
+
+ /**
+ * Makes a valid list bucketing directory name.
+ * @param lbCols The skewed keys' names
+ * @param vals The skewed values
+ * @return An escaped, valid list bucketing directory name.
+ */
+ public static String makeListBucketingDirName(List<String> lbCols, List<String> vals) {
+ StringBuilder name = new StringBuilder();
+ for (int i = 0; i < lbCols.size(); i++) {
+ if (i > 0) {
+ name.append(Path.SEPARATOR);
+ }
+ name.append(escapePathName((lbCols.get(i)).toLowerCase()));
+ name.append('=');
+ name.append(escapePathName(vals.get(i)));
+ }
+ return name.toString();
+ }
+
// NOTE: This is for generating the internal path name for partitions. Users
// should always use the MetaStore API to get the path name for a partition.
// Users should not directly take partition values and turn it into a path
@@ -157,7 +197,7 @@ public final class FileUtils {
for (char c : clist) {
charToEscape.set(c);
}
-
+
if(Shell.WINDOWS){
//On windows, following chars need to be escaped as well
char [] winClist = {' ', '<','>','|'};
@@ -257,27 +297,4 @@ public final class FileUtils {
}
}
- /**
- * default directory will have the same depth as number of skewed columns
- * this will make future operation easy like DML merge, concatenate merge
- * @param skewedCols
- * @param hconf
- * @return
- */
- public static String makeDefaultListBucketingDirName(List<String> skewedCols,
- Configuration hconf) {
- String lbDirName;
- String defaultDir = FileUtils.escapePathName(HiveConf.getVar(hconf,
- HiveConf.ConfVars.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME));
- StringBuilder defaultDirPath = new StringBuilder();
- for (int i = 0; i < skewedCols.size(); i++) {
- if (i > 0) {
- defaultDirPath.append(Path.SEPARATOR);
- }
- defaultDirPath.append(defaultDir);
- }
- lbDirName = defaultDirPath.toString();
- return lbDirName;
- }
-
}
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Dec 5 11:59:15 2012
@@ -658,15 +658,17 @@ public class HiveConf extends Configurat
"hive.multi.insert.move.tasks.share.dependencies", false),
/* The following section contains all configurations used for list bucketing feature.*/
- // Enable list bucketing DDL. Default value is false so that we disable it by default.
- // This will be removed once the rest of the DML changes are committed.
- HIVE_INTERNAL_DDL_LIST_BUCKETING_ENABLE("hive.internal.ddl.list.bucketing.enable", false),
-
- // Default list bucketing directory name.
- HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME("hive.exec.list.bucketing.default.dir",
- "HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME"),
+ /* This is not for clients. but only for block merge task. */
+ /* This is used by BlockMergeTask to send out flag to RCFileMergeMapper */
+ /* about alter table...concatenate and list bucketing case. */
+ HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING(
+ "hive.merge.current.job.concatenate.list.bucketing", true),
+ /* This is not for clients. but only for block merge task. */
+ /* This is used by BlockMergeTask to send out flag to RCFileMergeMapper */
+ /* about depth of list bucketing. */
+ HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH(
+ "hive.merge.current.job.concatenate.list.bucketing.depth", 0),
// Enable list bucketing optimizer. Default value is false so that we disable it by default.
- // This will be removed once the rest of the DML changes are committed.
HIVEOPTLISTBUCKETING("hive.optimize.listbucketing", false),
// Allow TCP Keep alive socket option for for HiveServer or a maximum timeout for the socket.
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Wed Dec 5 11:59:15 2012
@@ -537,23 +537,6 @@
</property>
<property>
- <name>hive.exec.list.bucketing.default.dir</name>
- <value>HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME</value>
- <description>Default directory name used in list bucketing.
- List bucketing feature will create sub-directory for each skewed-value and a default directory
- for non-skewed value. This config specifies the default name for the default directory.
- Sub-directory is created by list bucketing DML and under partition directory. User doesn't need
- to know how to construct the canonical path. It just gives user choice if they want to change
- the default directory name.
- For example, there are 2 skewed column c1 and c2. 2 skewed value: (1,a) and (2,b). subdirectory:
- <partition-dir>/c1=1/c2=a/
- <partition-dir>/c1=2/c2=b/
- <partition-dir>/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/
- Note: This config won't impact users if they don't list bucketing.
-</description>
-</property>
-
-<property>
<name>hive.skewjoin.key</name>
<value>100000</value>
<description>Determine if we get a skew key in join. If we see more
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Wed Dec 5 11:59:15 2012
@@ -969,8 +969,7 @@ public class Driver implements CommandPr
private boolean validateConfVariables() {
boolean valid = true;
if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES))
- && ((conf.getBoolVar(HiveConf.ConfVars.HIVE_INTERNAL_DDL_LIST_BUCKETING_ENABLE))
- || (conf.getBoolVar(HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE)) || (conf
+ && ((conf.getBoolVar(HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE)) || (conf
.getBoolVar(HiveConf.ConfVars.HIVEOPTLISTBUCKETING)))) {
errorMessage = "FAILED: Hive Internal Error: "
+ ErrorMsg.SUPPORT_DIR_MUST_TRUE_FOR_LIST_BUCKETING.getMsg();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Wed Dec 5 11:59:15 2012
@@ -252,7 +252,7 @@ public enum ErrorMsg {
SUPPORT_DIR_MUST_TRUE_FOR_LIST_BUCKETING(
10199,
"hive.mapred.supports.subdirectories must be true"
- + " if any one of following is true: hive.internal.ddl.list.bucketing.enable,"
+ + " if any one of following is true: "
+ " hive.optimize.listbucketing and mapred.input.dir.recursive"),
SKEWED_TABLE_NO_COLUMN_NAME(10200, "No skewed column name."),
SKEWED_TABLE_NO_COLUMN_VALUE(10201, "No skewed values."),
@@ -270,9 +270,6 @@ public enum ErrorMsg {
ALTER_TABLE_NOT_ALLOWED_RENAME_SKEWED_COLUMN(10207,
" is a skewed column. It's not allowed to rename skewed column"
+ " or change skewed column type."),
- HIVE_INTERNAL_DDL_LIST_BUCKETING_DISABLED(10208,
- "List Bucketing DDL is not allowed to use since feature is not completed yet."),
-
HIVE_GROUPING_SETS_AGGR_NOMAPAGGR(10209,
"Grouping sets aggregations (with rollups or cubes) are not allowed if map-side " +
" aggregation is turned off. Set hive.map.aggr=true if you want to use grouping sets"),
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Wed Dec 5 11:59:15 2012
@@ -464,6 +464,9 @@ public class DDLTask extends Task<DDLWor
// merge work only needs input and output.
MergeWork mergeWork = new MergeWork(mergeFilesDesc.getInputDir(),
mergeFilesDesc.getOutputDir());
+ mergeWork.setListBucketingCtx(mergeFilesDesc.getLbCtx());
+ mergeWork.resolveConcatenateMerge(db.getConf());
+ mergeWork.setMapperCannotSpanPartns(true);
DriverContext driverCxt = new DriverContext();
BlockMergeTask taskExec = new BlockMergeTask();
taskExec.initialize(db.getConf(), null, driverCxt);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Wed Dec 5 11:59:15 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
@@ -83,8 +85,11 @@ public class FileSinkOperator extends Te
protected transient List<Object> dpWritables;
protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters
protected transient int maxPartitions;
+ protected transient ListBucketingCtx lbCtx;
+ protected transient boolean isSkewedStoredAsSubDirectories;
private transient boolean statsCollectRawDataSize;
+
private static final transient String[] FATAL_ERR_MSG = {
null, // counter value 0 means no error
"Number of dynamic partitions exceeded hive.exec.max.dynamic.partitions.pernode."
@@ -308,6 +313,7 @@ public class FileSinkOperator extends Te
totalFiles = conf.getTotalFiles();
numFiles = conf.getNumFiles();
dpCtx = conf.getDynPartCtx();
+ lbCtx = conf.getLbCtx();
valToPaths = new HashMap<String, FSPaths>();
taskId = Utilities.getTaskId(hconf);
initializeSpecPath();
@@ -357,13 +363,19 @@ public class FileSinkOperator extends Te
dpSetup();
}
+ if (lbCtx != null) {
+ lbSetup();
+ }
+
if (!bDynParts) {
fsp = new FSPaths(specPath);
// Create all the files - this is required because empty files need to be created for
// empty buckets
// createBucketFiles(fsp);
- valToPaths.put("", fsp); // special entry for non-DP case
+ if (!this.isSkewedStoredAsSubDirectories) {
+ valToPaths.put("", fsp); // special entry for non-DP case
+ }
}
initializeChildren(hconf);
} catch (HiveException e) {
@@ -375,6 +387,13 @@ public class FileSinkOperator extends Te
}
/**
+ * Initialize list bucketing information
+ */
+ private void lbSetup() {
+ this.isSkewedStoredAsSubDirectories = ((lbCtx == null) ? false : lbCtx.isSkewedStoredAsDir());
+ }
+
+ /**
* Set up for dynamic partitioning including a new ObjectInspector for the output row.
*/
private void dpSetup() {
@@ -468,7 +487,7 @@ public class FileSinkOperator extends Te
// we create.
String extension = Utilities.getFileExtension(jc, isCompressed,
hiveOutputFormat);
- if (!bDynParts) {
+ if (!bDynParts && !this.isSkewedStoredAsSubDirectories) {
fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, parent, extension);
} else {
fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension);
@@ -539,8 +558,18 @@ public class FileSinkOperator extends Te
@Override
public void processOp(Object row, int tag) throws HiveException {
+ /* Create list bucketing sub-directory only if stored-as-directories is on. */
+ String lbDirName = null;
+ lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row);
+
+ FSPaths fpaths;
+
if (!bDynParts && !filesCreated) {
- createBucketFiles(fsp);
+ if (lbDirName != null) {
+ FSPaths fsp2 = lookupListBucketingPaths(lbDirName);
+ } else {
+ createBucketFiles(fsp);
+ }
}
// Since File Sink is a terminal operator, forward is not called - so,
@@ -559,8 +588,6 @@ public class FileSinkOperator extends Te
// if DP is enabled, get the final output writers and prepare the real output row
assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT : "input object inspector is not struct";
- FSPaths fpaths;
-
if (bDynParts) {
// copy the DP column values from the input row to dpVals
dpVals.clear();
@@ -578,10 +605,14 @@ public class FileSinkOperator extends Te
}
// use SubStructObjectInspector to serialize the non-partitioning columns in the input row
recordValue = serializer.serialize(row, subSetOI);
- fpaths = getDynOutPaths(dpVals);
+ fpaths = getDynOutPaths(dpVals, lbDirName);
} else {
- fpaths = fsp;
+ if (lbDirName != null) {
+ fpaths = lookupListBucketingPaths(lbDirName);
+ } else {
+ fpaths = fsp;
+ }
// use SerDe to serialize r, and write it out
recordValue = serializer.serialize(row, inputObjInspectors[0]);
}
@@ -623,7 +654,91 @@ public class FileSinkOperator extends Te
}
}
- private FSPaths getDynOutPaths(List<String> row) throws HiveException {
+ /**
+ * Lookup list bucketing path.
+ * @param lbDirName
+ * @return
+ * @throws HiveException
+ */
+ private FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
+ FSPaths fsp2 = valToPaths.get(lbDirName);
+ if (fsp2 == null) {
+ fsp2 = createNewPaths(lbDirName);
+ }
+ return fsp2;
+ }
+
+ /**
+ * create new path.
+ *
+ * @param dirName
+ * @return
+ * @throws HiveException
+ */
+ private FSPaths createNewPaths(String dirName) throws HiveException {
+ FSPaths fsp2 = new FSPaths(specPath);
+ if (childSpecPathDynLinkedPartitions != null) {
+ fsp2.tmpPath = new Path(fsp2.tmpPath,
+ dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
+ fsp2.taskOutputTempPath =
+ new Path(fsp2.taskOutputTempPath,
+ dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
+ } else {
+ fsp2.tmpPath = new Path(fsp2.tmpPath, dirName);
+ fsp2.taskOutputTempPath =
+ new Path(fsp2.taskOutputTempPath, dirName);
+ }
+ createBucketFiles(fsp2);
+ valToPaths.put(dirName, fsp2);
+ return fsp2;
+ }
+
+ /**
+ * Generate list bucketing directory name from a row.
+ * @param row row to process.
+ * @return directory name.
+ */
+ private String generateListBucketingDirName(Object row) {
+ if (!this.isSkewedStoredAsSubDirectories) {
+ return null;
+ }
+
+ String lbDirName = null;
+ List<Object> standObjs = new ArrayList<Object>();
+ List<String> skewedCols = lbCtx.getSkewedColNames();
+ List<List<String>> allSkewedVals = lbCtx.getSkewedColValues();
+ List<String> skewedValsCandidate = null;
+ Map<List<String>, String> locationMap = lbCtx.getLbLocationMap();
+
+ /* Convert input row to standard objects. */
+ ObjectInspectorUtils.copyToStandardObject(standObjs, row,
+ (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE);
+
+ assert (standObjs.size() >= skewedCols.size()) :
+ "The row has less number of columns than no. of skewed column.";
+
+ skewedValsCandidate = new ArrayList<String>(skewedCols.size());
+ for (int index : lbCtx.getRowSkewedIndex()) {
+ skewedValsCandidate.add(index, standObjs.get(index).toString());
+ }
+ /* The row matches skewed column names. */
+ if (allSkewedVals.contains(skewedValsCandidate)) {
+ /* matches skewed values. */
+ lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate);
+ locationMap.put(skewedValsCandidate, lbDirName);
+ } else {
+ /* create default directory. */
+ lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols,
+ lbCtx.getDefaultDirName());
+ List<String> defaultKey = Arrays.asList(lbCtx.getDefaultKey());
+ if (!locationMap.containsKey(defaultKey)) {
+ locationMap.put(defaultKey, lbDirName);
+ }
+ }
+ return lbDirName;
+ }
+
+ private FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
FSPaths fp;
@@ -631,6 +746,7 @@ public class FileSinkOperator extends Te
String dpDir = getDynPartDirectory(row, dpColNames, numDynParts);
if (dpDir != null) {
+ dpDir = appendListBucketingDirName(lbDirName, dpDir);
FSPaths fsp2 = valToPaths.get(dpDir);
if (fsp2 == null) {
@@ -641,20 +757,7 @@ public class FileSinkOperator extends Te
fatalError = true;
LOG.error("Fatal error was thrown due to exceeding number of dynamic partitions");
}
- fsp2 = new FSPaths(specPath);
- if (childSpecPathDynLinkedPartitions != null) {
- fsp2.tmpPath = new Path(fsp2.tmpPath,
- dpDir + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
- fsp2.taskOutputTempPath =
- new Path(fsp2.taskOutputTempPath,
- dpDir + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
- } else {
- fsp2.tmpPath = new Path(fsp2.tmpPath, dpDir);
- fsp2.taskOutputTempPath =
- new Path(fsp2.taskOutputTempPath, dpDir);
- }
- createBucketFiles(fsp2);
- valToPaths.put(dpDir, fsp2);
+ fsp2 = createNewPaths(dpDir);
}
fp = fsp2;
} else {
@@ -663,6 +766,20 @@ public class FileSinkOperator extends Te
return fp;
}
+ /**
+ * Append list bucketing dir name to original dir name.
+ * Skewed columns cannot be partitioned columns.
+ * @param lbDirName
+ * @param dpDir
+ * @return
+ */
+ private String appendListBucketingDirName(String lbDirName, String dpDir) {
+ StringBuilder builder = new StringBuilder(dpDir);
+ dpDir = (lbDirName == null) ? dpDir : builder.append(Path.SEPARATOR).append(lbDirName)
+ .toString();
+ return dpDir;
+ }
+
// given the current input row, the mapping for input col info to dp columns, and # of dp cols,
// return the relative path corresponding to the row.
// e.g., ds=2008-04-08/hr=11
@@ -866,10 +983,7 @@ public class FileSinkOperator extends Te
} else {
// for partitioned table, the key is
// common key prefix + static partition spec + DynamicPartSpec + taskID
- String keyPrefix = Utilities.getHashedStatsPrefix(
- conf.getStatsAggPrefix() + spSpec + fspKey + Path.SEPARATOR,
- conf.getMaxStatsKeyPrefixLength());
- key = keyPrefix + taskID;
+ key = createKeyForStatsPublisher(taskID, spSpec, fspKey);
}
Map<String, String> statsToPublish = new HashMap<String, String>();
for (String statType : fspValue.stat.getStoredStats()) {
@@ -891,4 +1005,64 @@ public class FileSinkOperator extends Te
}
}
}
+
+ /**
+ * This is server side code to create key in order to save statistics to stats database.
+ * Client side will read it via StatsTask.java aggregateStats().
+ * Client side reads it via db query prefix which is based on partition spec.
+ * Since store-as-subdir information is not part of partition spec, we have to
+ * remove store-as-subdir information from variable "keyPrefix" calculation.
+ * But we have to keep store-as-subdir information in variable "key" calculation
+ * since each skewed value has a row in stats db and "key" is db key,
+ * otherwise later value overwrites previous value.
+ * Performance impact due to string handling is minimum since this method is
+ * only called once in FileSinkOperator closeOp().
+ * For example,
+ * create table test skewed by (key, value) on (('484','val_484') stored as DIRECTORIES;
+ * skewedValueDirList contains 2 elements:
+ * 1. key=484/value=val_484
+ * 2. HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
+ * Case #1: Static partition with store-as-sub-dir
+ * spSpec has SP path
+ * fspKey has either
+ * key=484/value=val_484 or
+ * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
+ * After filter, fspKey is empty, storedAsDirPostFix has either
+ * key=484/value=val_484 or
+ * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
+ * so, at the end, "keyPrefix" doesnt have subdir information but "key" has
+ * Case #2: Dynamic partition with store-as-sub-dir. Assume dp part is hr
+ * spSpec has SP path
+ * fspKey has either
+ * hr=11/key=484/value=val_484 or
+ * hr=11/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
+ * After filter, fspKey is hr=11, storedAsDirPostFix has either
+ * key=484/value=val_484 or
+ * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
+ * so, at the end, "keyPrefix" doesn't have subdir information from skewed but "key" has
+ * @param taskID
+ * @param spSpec
+ * @param fspKey
+ * @return
+ */
+ private String createKeyForStatsPublisher(String taskID, String spSpec, String fspKey) {
+ String key;
+ String newFspKey = fspKey;
+ String storedAsDirPostFix = "";
+ if (isSkewedStoredAsSubDirectories) {
+ List<String> skewedValueDirList = this.lbCtx.getSkewedValuesDirNames();
+ for (String dir : skewedValueDirList) {
+ newFspKey = newFspKey.replace(dir, "");
+ if (!newFspKey.equals(fspKey)) {
+ storedAsDirPostFix = dir;
+ break;
+ }
+ }
+ }
+ String keyPrefix = Utilities.getHashedStatsPrefix(
+ conf.getStatsAggPrefix() + spSpec + newFspKey + Path.SEPARATOR,
+ conf.getMaxStatsKeyPrefixLength());
+ key = keyPrefix + storedAsDirPostFix + taskID;
+ return key;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Wed Dec 5 11:59:15 2012
@@ -37,8 +37,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -263,7 +263,8 @@ public class MoveTask extends Task<MoveW
tbd.getPartitionSpec(),
tbd.getReplace(),
dpCtx.getNumDPCols(),
- tbd.getHoldDDLTime());
+ tbd.getHoldDDLTime(),
+ isSkewedStoredAsDirs(tbd));
if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
throw new HiveException("This query creates no partitions." +
@@ -302,7 +303,8 @@ public class MoveTask extends Task<MoveW
dc = null; // reset data container to prevent it being added again.
} else { // static partitions
db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
- tbd.getPartitionSpec(), tbd.getReplace(), tbd.getHoldDDLTime(), tbd.getInheritTableSpecs());
+ tbd.getPartitionSpec(), tbd.getReplace(), tbd.getHoldDDLTime(),
+ tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd));
Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
dc = new DataContainer(table.getTTable(), partn.getTPartition());
// add this partition to post-execution hook
@@ -325,6 +327,11 @@ public class MoveTask extends Task<MoveW
}
}
+ private boolean isSkewedStoredAsDirs(LoadTableDesc tbd) {
+ return (tbd.getLbCtx() == null) ? false : tbd.getLbCtx()
+ .isSkewedStoredAsDir();
+ }
+
/*
* Does the move task involve moving to a local file system
*/
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Wed Dec 5 11:59:15 2012
@@ -395,7 +395,10 @@ public class StatsTask extends Task<Stat
}
fileSys = partn.getPartitionPath().getFileSystem(conf);
- fileStatus = Utilities.getFileStatusRecurse(partn.getPartitionPath(), 1, fileSys);
+ /* consider sub-directory created from list bucketing. */
+ int listBucketingDepth = calculateListBucketingDMLDepth(partn);
+ fileStatus = Utilities.getFileStatusRecurse(partn.getPartitionPath(),
+ (1 + listBucketingDepth), fileSys);
newPartStats.setStat(StatsSetupConst.NUM_FILES, fileStatus.length);
long partitionSize = 0L;
@@ -470,6 +473,28 @@ public class StatsTask extends Task<Stat
return ret;
}
+ /**
+ * List bucketing will introduce sub-directories.
+ *
+ * calculate it here in order to go to the leaf directory
+ *
+ * so that we can count right number of files.
+ *
+ * @param partn
+ * @return
+ */
+ private int calculateListBucketingDMLDepth(Partition partn) {
+ // list bucketing will introduce more files
+ int listBucketingDepth = 0;
+ if ((partn.getSkewedColNames() != null) && (partn.getSkewedColNames().size() > 0)
+ && (partn.getSkewedColValues() != null) && (partn.getSkewedColValues().size() > 0)
+ && (partn.getSkewedColValueLocationMaps() != null)
+ && (partn.getSkewedColValueLocationMaps().size() > 0)) {
+ listBucketingDepth = partn.getSkewedColNames().size();
+ }
+ return listBucketingDepth;
+ }
+
private boolean existStats(Map<String, String> parameters) {
return parameters.containsKey(StatsSetupConst.ROW_COUNT)
|| parameters.containsKey(StatsSetupConst.NUM_FILES)
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Wed Dec 5 11:59:15 2012
@@ -161,6 +161,16 @@ public class BlockMergeTask extends Task
HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS,
work.hasDynamicPartitions());
+ HiveConf.setBoolVar(job,
+ HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING,
+ work.isListBucketingAlterTableConcatenate());
+
+ HiveConf.setIntVar(
+ job,
+ HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH,
+ ((work.getListBucketingCtx() == null) ? 0 : work.getListBucketingCtx()
+ .calculateListBucketingLevel()));
+
int returnVal = 0;
RunningJob rj = null;
boolean noName = StringUtils.isEmpty(HiveConf.getVar(job,
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java Wed Dec 5 11:59:15 2012
@@ -18,16 +18,21 @@
package org.apache.hadoop.hive.ql.io.rcfile.merge;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -43,6 +48,8 @@ public class MergeWork extends MapredWor
private String outputDir;
private boolean hasDynamicPartitions;
private DynamicPartitionCtx dynPartCtx;
+ private boolean isListBucketingAlterTableConcatenate;
+ private ListBucketingCtx listBucketingCtx;
public MergeWork() {
}
@@ -115,7 +122,7 @@ public class MergeWork extends MapredWor
}
@Override
- public void resolveDynamicPartitionMerge(HiveConf conf, Path path,
+ public void resolveDynamicPartitionStoredAsSubDirsMerge(HiveConf conf, Path path,
TableDesc tblDesc, ArrayList<String> aliases, PartitionDesc partDesc) {
String inputFormatClass = conf.getVar(HiveConf.ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
@@ -126,12 +133,56 @@ public class MergeWork extends MapredWor
String msg = "Merge input format class not found";
throw new RuntimeException(msg);
}
- super.resolveDynamicPartitionMerge(conf, path, tblDesc, aliases, partDesc);
+ super.resolveDynamicPartitionStoredAsSubDirsMerge(conf, path, tblDesc, aliases, partDesc);
// Add the DP path to the list of input paths
inputPaths.add(path.toString());
}
+ /**
+ * alter table ... concatenate
+ *
+ * If it is skewed table, use subdirectories in inputpaths.
+ */
+ public void resolveConcatenateMerge(HiveConf conf) {
+ isListBucketingAlterTableConcatenate = ((listBucketingCtx == null) ? false : listBucketingCtx
+ .isSkewedStoredAsDir());
+ if (isListBucketingAlterTableConcatenate) {
+ // use sub-dir as inputpath.
+ assert ((this.inputPaths != null) && (this.inputPaths.size() == 1)) :
+ "alter table ... concatenate should only have one directory inside inputpaths";
+ String dirName = inputPaths.get(0);
+ Path dirPath = new Path(dirName);
+ try {
+ FileSystem inpFs = dirPath.getFileSystem(conf);
+ FileStatus[] status = Utilities.getFileStatusRecurse(dirPath, listBucketingCtx
+ .getSkewedColNames().size(), inpFs);
+ List<String> newInputPath = new ArrayList<String>();
+ boolean succeed = true;
+ for (int i = 0; i < status.length; ++i) {
+ if (status[i].isDir()) {
+ // Add the lb path to the list of input paths
+ newInputPath.add(status[i].getPath().toString());
+ } else {
+ // find file instead of dir. dont change inputpath
+ succeed = false;
+ }
+ }
+ assert (succeed || ((!succeed) && newInputPath.isEmpty())) : "This partition has "
+ + " inconsistent file structure: "
+ + "it is stored-as-subdir and expected all files in the same depth of subdirectories.";
+ if (succeed) {
+ inputPaths.clear();
+ inputPaths.addAll(newInputPath);
+ }
+ } catch (IOException e) {
+ String msg = "Fail to get filesystem for directory name : " + dirName;
+ throw new RuntimeException(msg, e);
+ }
+
+ }
+ }
+
public DynamicPartitionCtx getDynPartCtx() {
return dynPartCtx;
}
@@ -140,4 +191,25 @@ public class MergeWork extends MapredWor
this.dynPartCtx = dynPartCtx;
}
+ /**
+ * @return the listBucketingCtx
+ */
+ public ListBucketingCtx getListBucketingCtx() {
+ return listBucketingCtx;
+ }
+
+ /**
+ * @param listBucketingCtx the listBucketingCtx to set
+ */
+ public void setListBucketingCtx(ListBucketingCtx listBucketingCtx) {
+ this.listBucketingCtx = listBucketingCtx;
+ }
+
+ /**
+ * @return the isListBucketingAlterTableConcatenate
+ */
+ public boolean isListBucketingAlterTableConcatenate() {
+ return isListBucketingAlterTableConcatenate;
+ }
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java Wed Dec 5 11:59:15 2012
@@ -60,6 +60,10 @@ public class RCFileMergeMapper extends M
int columnNumber = 0;
boolean hasDynamicPartitions = false;
+ boolean isListBucketingDML = false;
+ boolean isListBucketingAlterTableConcatenate = false;
+ int listBucketingDepth; // used as depth for dir-calculation and if it is list bucketing case.
+ boolean tmpPathFixedConcatenate = false;
boolean tmpPathFixed = false;
Path tmpPath;
Path taskTmpPath;
@@ -75,6 +79,10 @@ public class RCFileMergeMapper extends M
jc = job;
hasDynamicPartitions = HiveConf.getBoolVar(job,
HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS);
+ isListBucketingAlterTableConcatenate = HiveConf.getBoolVar(job,
+ HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING);
+ listBucketingDepth = HiveConf.getIntVar(job,
+ HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH);
String specPath = RCFileBlockMergeOutputFormat.getMergeOutputPath(job)
.toString();
@@ -112,13 +120,33 @@ public class RCFileMergeMapper extends M
key = (RCFileKeyBufferWrapper) k;
}
- if (hasDynamicPartitions) {
- if (tmpPathFixed) {
+ /**
+ * 1. boolean isListBucketingAlterTableConcatenate will be true only if it is alter table ...
+ * concatenate on stored-as-dir so it will handle list bucketing alter table merge in the if
+ * cause with the help of fixTmpPathConcatenate
+ * 2. If it is DML, isListBucketingAlterTableConcatenate will be false so that it will be
+ * handled by else cause. In this else cause, we have another if check.
+ * 2.1 the if check will make sure DP or LB, we will fix path with the help of fixTmpPath(..).
+ * Since both has sub-directories. it includes SP + LB.
+ * 2.2 only SP without LB, we dont fix path.
+ */
+ // Fix temp path for alter table ... concatenate
+ if (isListBucketingAlterTableConcatenate) {
+ if (this.tmpPathFixedConcatenate) {
checkPartitionsMatch(key.inputPath.getParent());
} else {
- // We haven't fixed the TMP path for this mapper yet
- fixTmpPath(key.inputPath.getParent());
- tmpPathFixed = true;
+ fixTmpPathConcatenate(key.inputPath.getParent());
+ tmpPathFixedConcatenate = true;
+ }
+ } else {
+ if (hasDynamicPartitions || (listBucketingDepth > 0)) {
+ if (tmpPathFixed) {
+ checkPartitionsMatch(key.inputPath.getParent());
+ } else {
+ // We haven't fixed the TMP path for this mapper yet
+ fixTmpPath(key.inputPath.getParent());
+ tmpPathFixed = true;
+ }
}
}
@@ -166,7 +194,18 @@ public class RCFileMergeMapper extends M
/**
* Fixes tmpPath to point to the correct partition.
* Before this is called, tmpPath will default to the root tmp table dir
- *
+ * fixTmpPath(..) works for DP + LB + multiple skewed values + merge. reason:
+ * 1. fixTmpPath(..) compares inputPath and tmpDepth, find out path difference and put it into
+ * newPath. Then add newpath to existing this.tmpPath and this.taskTmpPath.
+ * 2. The path difference between inputPath and tmpDepth can be DP or DP+LB. It will automatically
+ * handle it.
+ * 3. For example,
+ * if inputpath is <prefix>/-ext-10002/hr=a1/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/
+ * HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME
+ * tmppath is <prefix>/_tmp.-ext-10000
+ * newpath will be hr=a1/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME
+ * Then, this.tmpPath and this.taskTmpPath will be update correctly.
+ * We have list_bucket_dml_6.q cover this case: DP + LP + multiple skewed values + merge.
* @param inputPath
* @throws HiveException
* @throws IOException
@@ -193,6 +232,48 @@ public class RCFileMergeMapper extends M
updatePaths(newTmpPath, newTaskTmpPath);
}
+ /**
+ * Fixes tmpPath to point to the correct list bucketing sub-directories.
+ * Before this is called, tmpPath will default to the root tmp table dir
+ * Reason to add a new method instead of changing fixTmpPath()
+ * Reason 1: logic has slightly difference
+ * fixTmpPath(..) needs 2 variables in order to decide path delta which is in variable newPath.
+ * 1. inputPath.depth()
+ * 2. tmpPath.depth()
+ * fixTmpPathConcatenate needs 2 variables too but one of them is different from fixTmpPath(..)
+ * 1. inputPath.depth()
+ * 2. listBucketingDepth
+ * Reason 2: less risks
+ * The existing logic is a little not trivial around map() and fixTmpPath(). In order to ensure
+ * minimum impact on existing flow, we try to avoid change on existing code/flow but add new code
+ * for new feature.
+ *
+ * @param inputPath
+ * @throws HiveException
+ * @throws IOException
+ */
+ private void fixTmpPathConcatenate(Path inputPath)
+ throws HiveException, IOException {
+ dpPath = inputPath;
+ Path newPath = new Path(".");
+
+ int depth = listBucketingDepth;
+ // Build the path from bottom up. pick up list bucketing subdirectories
+ while ((inputPath != null) && (depth > 0)) {
+ newPath = new Path(inputPath.getName(), newPath);
+ inputPath = inputPath.getParent();
+ depth--;
+ }
+
+ Path newTmpPath = new Path(tmpPath, newPath);
+ Path newTaskTmpPath = new Path(taskTmpPath, newPath);
+ if (!fs.exists(newTmpPath)) {
+ fs.mkdirs(newTmpPath);
+ }
+ updatePaths(newTmpPath, newTaskTmpPath);
+ }
+
+
@Override
public void close() throws IOException {
// close writer
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Wed Dec 5 11:59:15 2012
@@ -28,6 +28,7 @@ import static org.apache.hadoop.hive.ser
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -45,6 +46,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaException;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
@@ -71,9 +73,11 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
import org.apache.hadoop.hive.metastore.api.Role;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
+import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
import org.apache.hadoop.hive.ql.session.CreateTableAutomaticGrant;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.Deserializer;
@@ -1116,7 +1120,7 @@ public class Hive {
/**
* Load a directory into a Hive Table Partition - Alters existing content of
- * the partition with the contents of loadPath. - If he partition does not
+ * the partition with the contents of loadPath. - If the partition does not
* exist - one is created - files in loadPath are moved into Hive. But the
* directory itself is not removed.
*
@@ -1135,7 +1139,7 @@ public class Hive {
*/
public void loadPartition(Path loadPath, String tableName,
Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
- boolean inheritTableSpecs)
+ boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir)
throws HiveException {
Table tbl = getTable(tableName);
try {
@@ -1187,7 +1191,21 @@ public class Hive {
// recreate the partition if it existed before
if (!holdDDLTime) {
- getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs);
+ Partition newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(),
+ inheritTableSpecs);
+ if (isSkewedStoreAsSubdir) {
+ org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition();
+ SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo();
+ /* Construct list bucketing location mappings from sub-directory name. */
+ Map<List<String>, String> skewedColValueLocationMaps = constructListBucketingLocationMap(
+ newPartPath, skewedInfo);
+ /* Add list bucketing location mappings. */
+ skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
+ newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
+ alterPartition(tbl.getTableName(), new Partition(tbl, newCreatedTpart));
+ newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs);
+ newCreatedTpart = newTPart.getTPartition();
+ }
}
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
@@ -1195,10 +1213,96 @@ public class Hive {
} catch (MetaException e) {
LOG.error(StringUtils.stringifyException(e));
throw new HiveException(e);
+ } catch (InvalidOperationException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new HiveException(e);
+ }
+
+ }
+
+ /**
+ * Walk through sub-directory tree to construct list bucketing location map.
+ *
+ * @param fSta
+ * @param fSys
+ * @param skewedColValueLocationMaps
+ * @param newPartPath
+ * @param skewedInfo
+ * @throws IOException
+ */
+private void walkDirTree(FileStatus fSta, FileSystem fSys,
+ Map<List<String>, String> skewedColValueLocationMaps, Path newPartPath, SkewedInfo skewedInfo)
+ throws IOException {
+ /* Base Case. It's leaf. */
+ if (!fSta.isDir()) {
+ /* construct one location map if not exists. */
+ constructOneLBLocationMap(fSta, skewedColValueLocationMaps, newPartPath, skewedInfo);
+ return;
+ }
+
+ /* dfs. */
+ FileStatus[] children = fSys.listStatus(fSta.getPath());
+ if (children != null) {
+ for (FileStatus child : children) {
+ walkDirTree(child, fSys, skewedColValueLocationMaps, newPartPath, skewedInfo);
}
+ }
+}
+/**
+ * Construct a list bucketing location map
+ * @param fSta
+ * @param skewedColValueLocationMaps
+ * @param newPartPath
+ * @param skewedInfo
+ */
+private void constructOneLBLocationMap(FileStatus fSta,
+ Map<List<String>, String> skewedColValueLocationMaps,
+ Path newPartPath, SkewedInfo skewedInfo) {
+ Path lbdPath = fSta.getPath().getParent();
+ List<String> skewedValue = new ArrayList<String>();
+ String lbDirName = FileUtils.unescapePathName(lbdPath.toString());
+ String partDirName = FileUtils.unescapePathName(newPartPath.toString());
+ String lbDirSuffix = lbDirName.replace(partDirName, "");
+ String[] dirNames = lbDirSuffix.split(Path.SEPARATOR);
+ for (String dirName : dirNames) {
+ if ((dirName != null) && (dirName.length() > 0)) {
+ // Construct skewed-value to location map except default directory.
+ // why? query logic knows default-dir structure and don't need to get from map
+ if (!dirName
+ .equalsIgnoreCase(ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME)) {
+ String[] kv = dirName.split("=");
+ if (kv.length == 2) {
+ skewedValue.add(kv[1]);
+ }
+ }
+ }
+ }
+ if ((skewedValue.size() > 0) && (skewedValue.size() == skewedInfo.getSkewedColNames().size())
+ && !skewedColValueLocationMaps.containsKey(skewedValue)) {
+ skewedColValueLocationMaps.put(skewedValue, lbDirName);
+ }
+}
+
+ /**
+ * Construct location map from path
+ *
+ * @param newPartPath
+ * @param skewedInfo
+ * @return
+ * @throws IOException
+ * @throws FileNotFoundException
+ */
+ private Map<List<String>, String> constructListBucketingLocationMap(Path newPartPath,
+ SkewedInfo skewedInfo) throws IOException, FileNotFoundException {
+ Map<List<String>, String> skewedColValueLocationMaps = new HashMap<List<String>, String>();
+ FileSystem fSys = newPartPath.getFileSystem(conf);
+ walkDirTree(fSys.getFileStatus(newPartPath), fSys, skewedColValueLocationMaps, newPartPath,
+ skewedInfo);
+ return skewedColValueLocationMaps;
}
+
/**
* Given a source directory name of the load path, load all dynamically generated partitions
* into the specified table and return a list of strings that represent the dynamic partition
@@ -1214,7 +1318,7 @@ public class Hive {
*/
public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath,
String tableName, Map<String, String> partSpec, boolean replace,
- int numDP, boolean holdDDLTime)
+ int numDP, boolean holdDDLTime, boolean listBucketingEnabled)
throws HiveException {
Set<Path> validPartitions = new HashSet<Path>();
@@ -1263,7 +1367,8 @@ public class Hive {
fullPartSpecs.add(fullPartSpec);
// finally load the partition -- move the file to the final table address
- loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true);
+ loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true,
+ listBucketingEnabled);
LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
}
return fullPartSpecs;
@@ -2030,9 +2135,8 @@ public class Hive {
* @param oldPath
* The directory where the old data location, need to be cleaned up.
*/
- static protected void replaceFiles(Path srcf, Path destf, Path oldPath,
- HiveConf conf) throws HiveException {
-
+ static protected void replaceFiles(Path srcf, Path destf, Path oldPath, HiveConf conf)
+ throws HiveException {
try {
FileSystem fs = srcf.getFileSystem(conf);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Wed Dec 5 11:59:15 2012
@@ -614,6 +614,10 @@ public class Partition implements Serial
tPartition.setLastAccessTime(lastAccessTime);
}
+ public boolean isStoredAsSubDirectories() {
+ return tPartition.getSd().isStoredAsSubDirectories();
+ }
+
public List<List<String>> getSkewedColValues(){
return tPartition.getSd().getSkewedInfo().getSkewedColValues();
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java Wed Dec 5 11:59:15 2012
@@ -21,8 +21,11 @@ package org.apache.hadoop.hive.ql.metada
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.hadoop.hive.metastore.TableType;
@@ -34,6 +37,7 @@ import org.apache.hadoop.hive.ql.index.H
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.DescTableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ShowIndexesDesc;
@@ -209,8 +213,18 @@ public final class MetaDataFormatUtils {
Map<List<String>, String> skewedColMap = storageDesc.getSkewedInfo()
.getSkewedColValueLocationMaps();
if ((skewedColMap!=null) && (skewedColMap.size() > 0)) {
- formatOutput("Skewed Value to Location Mapping:", skewedColMap.toString(),
+ formatOutput("Skewed Value to Path:", skewedColMap.toString(),
tableInfo);
+ Map<List<String>, String> truncatedSkewedColMap = new HashMap<List<String>, String>();
+ // walk through existing map to truncate path so that test won't mask it
+ // then we can verify location is right
+ Set<Entry<List<String>, String>> entries = skewedColMap.entrySet();
+ for (Entry<List<String>, String> entry : entries) {
+ truncatedSkewedColMap.put(entry.getKey(),
+ PlanUtils.removePrefixFromWarehouseConfig(entry.getValue()));
+ }
+ formatOutput("Skewed Value to Truncated Path:",
+ truncatedSkewedColMap.toString(), tableInfo);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Wed Dec 5 11:59:15 2012
@@ -106,8 +106,8 @@ public class GenMRFileSink1 implements N
Task<? extends Serializable> currTask = ctx.getCurrTask();
FileSinkOperator fsOp = (FileSinkOperator) nd;
boolean isInsertTable = // is INSERT OVERWRITE TABLE
- fsOp.getConf().getTableInfo().getTableName() != null &&
- parseCtx.getQB().getParseInfo().isInsertToTable();
+ fsOp.getConf().getTableInfo().getTableName() != null &&
+ parseCtx.getQB().getParseInfo().isInsertToTable();
HiveConf hconf = parseCtx.getConf();
// If this file sink desc has been processed due to a linked file sink desc,
@@ -147,21 +147,20 @@ public class GenMRFileSink1 implements N
hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES)) {
chDir = true;
}
- }
- else {
- // There are separate configuration parameters to control whether to
- // merge for a map-only job
- // or for a map-reduce job
- MapredWork currWork = (MapredWork) currTask.getWork();
- boolean mergeMapOnly =
- hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES) &&
- currWork.getReducer() == null;
- boolean mergeMapRed =
- hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES) &&
- currWork.getReducer() != null;
- if (mergeMapOnly || mergeMapRed) {
- chDir = true;
- }
+ } else {
+ // There are separate configuration parameters to control whether to
+ // merge for a map-only job
+ // or for a map-reduce job
+ MapredWork currWork = (MapredWork) currTask.getWork();
+ boolean mergeMapOnly =
+ hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES) &&
+ currWork.getReducer() == null;
+ boolean mergeMapRed =
+ hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES) &&
+ currWork.getReducer() != null;
+ if (mergeMapOnly || mergeMapRed) {
+ chDir = true;
+ }
}
}
}
@@ -229,20 +228,25 @@ public class GenMRFileSink1 implements N
* Add the StatsTask as a dependent task of the MoveTask
* because StatsTask will change the Table/Partition metadata. For atomicity, we
* should not change it before the data is actually there done by MoveTask.
- * @param nd the FileSinkOperator whose results are taken care of by the MoveTask.
- * @param mvTask The MoveTask that moves the FileSinkOperator's results.
- * @param currTask The MapRedTask that the FileSinkOperator belongs to.
- * @param hconf HiveConf
+ *
+ * @param nd
+ * the FileSinkOperator whose results are taken care of by the MoveTask.
+ * @param mvTask
+ * The MoveTask that moves the FileSinkOperator's results.
+ * @param currTask
+ * The MapRedTask that the FileSinkOperator belongs to.
+ * @param hconf
+ * HiveConf
*/
private void addStatsTask(FileSinkOperator nd, MoveTask mvTask,
Task<? extends Serializable> currTask, HiveConf hconf) {
- MoveWork mvWork = ((MoveTask)mvTask).getWork();
+ MoveWork mvWork = ((MoveTask) mvTask).getWork();
StatsWork statsWork = null;
- if(mvWork.getLoadTableWork() != null){
- statsWork = new StatsWork(mvWork.getLoadTableWork());
- }else if (mvWork.getLoadFileWork() != null){
- statsWork = new StatsWork(mvWork.getLoadFileWork());
+ if (mvWork.getLoadTableWork() != null) {
+ statsWork = new StatsWork(mvWork.getLoadTableWork());
+ } else if (mvWork.getLoadFileWork() != null) {
+ statsWork = new StatsWork(mvWork.getLoadFileWork());
}
assert statsWork != null : "Error when genereting StatsTask";
statsWork.setStatsReliable(hconf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
@@ -250,7 +254,7 @@ public class GenMRFileSink1 implements N
// AggKey in StatsWork is used for stats aggregation while StatsAggPrefix
// in FileSinkDesc is used for stats publishing. They should be consistent.
- statsWork.setAggKey(((FileSinkOperator)nd).getConf().getStatsAggPrefix());
+ statsWork.setAggKey(((FileSinkOperator) nd).getConf().getStatsAggPrefix());
Task<? extends Serializable> statsTask = TaskFactory.get(statsWork, hconf);
// mark the MapredWork and FileSinkOperator for gathering stats
@@ -281,7 +285,7 @@ public class GenMRFileSink1 implements N
ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
for (ColumnInfo ci : inputRS.getSignature()) {
valueCols.add(new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(),
- ci.getTabAlias(), ci.getIsVirtualCol()));
+ ci.getTabAlias(), ci.getIsVirtualCol(), ci.isSkewedCol()));
}
// create a dummy tableScan operator
@@ -314,7 +318,7 @@ public class GenMRFileSink1 implements N
Operator<ExtractDesc> extract = OperatorFactory.getAndMakeChild(new ExtractDesc(
new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
Utilities.ReduceField.VALUE.toString(), "", false)),
- new RowSchema(out_rwsch.getColumnInfos()));
+ new RowSchema(out_rwsch.getColumnInfos()));
TableDesc ts = (TableDesc) fsConf.getTableInfo().clone();
fsConf.getTableInfo().getProperties().remove(
@@ -323,7 +327,7 @@ public class GenMRFileSink1 implements N
FileSinkDesc newFSD = new FileSinkDesc(finalName, ts, parseCtx.getConf()
.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT));
FileSinkOperator newOutput = (FileSinkOperator) OperatorFactory.
- getAndMakeChild(newFSD, inputRS, extract);
+ getAndMakeChild(newFSD, inputRS, extract);
HiveConf conf = parseCtx.getConf();
MapredWork cplan = createMergeTask(conf, tsMerge, fsConf);
@@ -344,9 +348,13 @@ public class GenMRFileSink1 implements N
/**
* Create a MapReduce job for a particular partition if Hadoop version is pre 0.20,
* otherwise create a Map-only job using CombineHiveInputFormat for all partitions.
- * @param fsOp The FileSink operator.
- * @param ctx The MR processing context.
- * @param finalName the final destination path the merge job should output.
+ *
+ * @param fsOp
+ * The FileSink operator.
+ * @param ctx
+ * The MR processing context.
+ * @param finalName
+ * the final destination path the merge job should output.
* @throws SemanticException
*/
private void createMergeJob(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName)
@@ -372,32 +380,36 @@ public class GenMRFileSink1 implements N
/**
* create a Map-only merge job with the following operators:
+ *
* @param fsInput
* @param ctx
* @param finalName
- * MR job J0:
+ * MR job J0:
* ...
- * |
- * v
- * FileSinkOperator_1 (fsInput)
- * |
- * v
- * Merge job J1:
- * |
- * v
- * TableScan (using CombineHiveInputFormat) (tsMerge)
- * |
- * v
- * FileSinkOperator (fsMerge)
- *
- * Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths do
- * not contain the dynamic partitions (their parent). So after the dynamic partitions are
- * created (after the first job finished before the moveTask or ConditionalTask start),
- * we need to change the pathToPartitionInfo & pathToAlias to include the dynamic partition
- * directories.
+ * |
+ * v
+ * FileSinkOperator_1 (fsInput)
+ * |
+ * v
+ * Merge job J1:
+ * |
+ * v
+ * TableScan (using CombineHiveInputFormat) (tsMerge)
+ * |
+ * v
+ * FileSinkOperator (fsMerge)
+ *
+ * Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths
+ * do
+ * not contain the dynamic partitions (their parent). So after the dynamic partitions are
+ * created (after the first job finished before the moveTask or ConditionalTask start),
+ * we need to change the pathToPartitionInfo & pathToAlias to include the dynamic
+ * partition
+ * directories.
*
*/
- private void createMap4Merge(FileSinkOperator fsInput, GenMRProcContext ctx, String finalName) throws SemanticException {
+ private void createMap4Merge(FileSinkOperator fsInput, GenMRProcContext ctx, String finalName)
+ throws SemanticException {
//
// 1. create the operator tree
@@ -408,14 +420,14 @@ public class GenMRFileSink1 implements N
// Create a TableScan operator
RowSchema inputRS = fsInput.getSchema();
Operator<? extends OperatorDesc> tsMerge =
- OperatorFactory.get(TableScanDesc.class, inputRS);
+ OperatorFactory.get(TableScanDesc.class, inputRS);
// Create a FileSink operator
TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
- FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts,
+ FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts,
parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSRESULT));
FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild(
- fsOutputDesc, inputRS, tsMerge);
+ fsOutputDesc, inputRS, tsMerge);
// If the input FileSinkOperator is a dynamic partition enabled, the tsMerge input schema
// needs to include the partition column, and the fsOutput should have
@@ -427,7 +439,7 @@ public class GenMRFileSink1 implements N
String tblAlias = fsInputDesc.getTableInfo().getTableName();
LinkedHashMap<String, String> colMap = new LinkedHashMap<String, String>();
StringBuilder partCols = new StringBuilder();
- for (String dpCol: dpCtx.getDPColNames()) {
+ for (String dpCol : dpCtx.getDPColNames()) {
ColumnInfo colInfo = new ColumnInfo(dpCol,
TypeInfoFactory.stringTypeInfo, // all partition column type should be string
tblAlias, true); // partition column is virtual column
@@ -435,7 +447,7 @@ public class GenMRFileSink1 implements N
colMap.put(dpCol, dpCol); // input and output have the same column name
partCols.append(dpCol).append('/');
}
- partCols.setLength(partCols.length()-1); // remove the last '/'
+ partCols.setLength(partCols.length() - 1); // remove the last '/'
inputRS.setSignature(signature);
// create another DynamicPartitionCtx, which has a different input-to-DP column mapping
@@ -461,16 +473,16 @@ public class GenMRFileSink1 implements N
new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
MapredWork cplan;
- if(parseCtx.getConf().getBoolVar(HiveConf.ConfVars.
+ if (parseCtx.getConf().getBoolVar(HiveConf.ConfVars.
HIVEMERGERCFILEBLOCKLEVEL) &&
fsInputDesc.getTableInfo().getInputFileFormatClass().
- equals(RCFileInputFormat.class)) {
+ equals(RCFileInputFormat.class)) {
// Check if InputFormatClass is valid
String inputFormatClass = parseCtx.getConf().
getVar(HiveConf.ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
try {
- Class c = (Class <? extends InputFormat>) Class.forName(inputFormatClass);
+ Class c = (Class<? extends InputFormat>) Class.forName(inputFormatClass);
LOG.info("RCFile format- Using block level merge");
cplan = createRCFileMergeTask(fsInputDesc, finalName,
@@ -492,14 +504,15 @@ public class GenMRFileSink1 implements N
// keep the dynamic partition context in conditional task resolver context
ConditionalResolverMergeFilesCtx mrCtx =
- (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
+ (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
mrCtx.setDPCtx(fsInputDesc.getDynPartCtx());
+ mrCtx.setLbCtx(fsInputDesc.getLbCtx());
//
// 3. add the moveTask as the children of the conditional task
//
linkMoveTask(ctx, fsOutput, cndTsk);
- }
+ }
/**
* Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all
@@ -542,10 +555,11 @@ public class GenMRFileSink1 implements N
}
/**
- * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a
+ * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a
* load table, and HIVE_MULTI_INSERT_ATOMIC_OUTPUTS is set, adds mvTask as a dependent of
* dependencyTaskForMultiInsert in ctx, otherwise adds mvTask as a dependent of parentTask as
* well.
+ *
* @param ctx
* @param mvTask
* @param parentTask
@@ -576,11 +590,16 @@ public class GenMRFileSink1 implements N
/**
* Create a MapredWork based on input path, the top operator and the input
* table descriptor.
+ *
* @param conf
- * @param topOp the table scan operator that is the root of the MapReduce task.
- * @param fsDesc the file sink descriptor that serves as the input to this merge task.
- * @param parentMR the parent MapReduce work
- * @param parentFS the last FileSinkOperator in the parent MapReduce work
+ * @param topOp
+ * the table scan operator that is the root of the MapReduce task.
+ * @param fsDesc
+ * the file sink descriptor that serves as the input to this merge task.
+ * @param parentMR
+ * the parent MapReduce work
+ * @param parentFS
+ * the last FileSinkOperator in the parent MapReduce work
* @return the MapredWork
*/
private MapredWork createMergeTask(HiveConf conf, Operator<? extends OperatorDesc> topOp,
@@ -604,6 +623,7 @@ public class GenMRFileSink1 implements N
/**
* Create a block level merge task for RCFiles.
+ *
* @param fsInputDesc
* @param finalName
* @return MergeWork if table is stored as RCFile,
@@ -615,9 +635,10 @@ public class GenMRFileSink1 implements N
String inputDir = fsInputDesc.getFinalDirName();
TableDesc tblDesc = fsInputDesc.getTableInfo();
- if(tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+ if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
ArrayList<String> inputDirs = new ArrayList<String>();
- if (!hasDynamicPartitions) {
+ if (!hasDynamicPartitions
+ && !isSkewedStoredAsDirs(fsInputDesc)) {
inputDirs.add(inputDir);
}
@@ -630,10 +651,12 @@ public class GenMRFileSink1 implements N
work.setPathToAliases(pathToAliases);
work.setAliasToWork(
new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
- if (hasDynamicPartitions) {
+ if (hasDynamicPartitions
+ || isSkewedStoredAsDirs(fsInputDesc)) {
work.getPathToPartitionInfo().put(inputDir,
new PartitionDesc(tblDesc, null));
}
+ work.setListBucketingCtx(fsInputDesc.getLbCtx());
return work;
}
@@ -642,12 +665,29 @@ public class GenMRFileSink1 implements N
}
/**
+ * check if it is skewed table and stored as dirs.
+ *
+ * @param fsInputDesc
+ * @return
+ */
+ private boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) {
+ return (fsInputDesc.getLbCtx() == null) ? false : fsInputDesc.getLbCtx()
+ .isSkewedStoredAsDir();
+ }
+
+ /**
* Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
- * @param conf HiveConf
- * @param currTask current leaf task
- * @param mvWork MoveWork for the move task
- * @param mergeWork MapredWork for the merge task.
- * @param inputPath the input directory of the merge/move task
+ *
+ * @param conf
+ * HiveConf
+ * @param currTask
+ * current leaf task
+ * @param mvWork
+ * MoveWork for the move task
+ * @param mergeWork
+ * MapredWork for the merge task.
+ * @param inputPath
+ * the input directory of the merge/move task
* @return The conditional task
*/
private ConditionalTask createCondTask(HiveConf conf,
@@ -658,8 +698,8 @@ public class GenMRFileSink1 implements N
// 1) Merge the partitions
// 2) Move the partitions (i.e. don't merge the partitions)
// 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't
- // merge others) in this case the merge is done first followed by the move to prevent
- // conflicts.
+ // merge others) in this case the merge is done first followed by the move to prevent
+ // conflicts.
Task<? extends Serializable> mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf);
Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(mvWork, conf);
Task<? extends Serializable> mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf);
@@ -687,7 +727,7 @@ public class GenMRFileSink1 implements N
// create resolver
cndTsk.setResolver(new ConditionalResolverMergeFiles());
ConditionalResolverMergeFilesCtx mrCtx =
- new ConditionalResolverMergeFilesCtx(listTasks, inputPath);
+ new ConditionalResolverMergeFilesCtx(listTasks, inputPath);
cndTsk.setResolverCtx(mrCtx);
// make the conditional task as the child of the current leaf task
@@ -719,11 +759,15 @@ public class GenMRFileSink1 implements N
/**
* Process the FileSink operator to generate a MoveTask if necessary.
- * @param nd current FileSink operator
- * @param stack parent operators
+ *
+ * @param nd
+ * current FileSink operator
+ * @param stack
+ * parent operators
* @param opProcCtx
- * @param chDir whether the operator should be first output to a tmp dir and then merged
- * to the final dir later
+ * @param chDir
+ * whether the operator should be first output to a tmp dir and then merged
+ * to the final dir later
* @return the final file name to which the FileSinkOperator should store.
* @throws SemanticException
*/
@@ -783,7 +827,7 @@ public class GenMRFileSink1 implements N
Operator<? extends OperatorDesc> currTopOp = ctx.getCurrTopOp();
String currAliasId = ctx.getCurrAliasId();
HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
- ctx.getOpTaskMap();
+ ctx.getOpTaskMap();
List<Operator<? extends OperatorDesc>> seenOps = ctx.getSeenOps();
List<Task<? extends Serializable>> rootTasks = ctx.getRootTasks();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java Wed Dec 5 11:59:15 2012
@@ -141,10 +141,14 @@ public abstract class PrunerOperatorFact
TableScanOperator top, ExprNodeDesc new_pruner_pred, Partition part)
throws UDFArgumentException {
Map<String, ExprNodeDesc> oldPartToPruner = opToPrunner.get(top);
+ Map<String, ExprNodeDesc> partToPruner = null;
ExprNodeDesc pruner_pred = null;
if (oldPartToPruner == null) {
pruner_pred = new_pruner_pred;
+ // create new mapping
+ partToPruner = new HashMap<String, ExprNodeDesc>();
} else {
+ partToPruner = oldPartToPruner;
ExprNodeDesc old_pruner_pred = oldPartToPruner.get(part.getName());
if (old_pruner_pred != null) {
// or the old_pruner_pred and the new_ppr_pred
@@ -156,8 +160,8 @@ public abstract class PrunerOperatorFact
}
// Put the mapping from part to pruner_pred
- Map<String, ExprNodeDesc> partToPruner = new HashMap<String, ExprNodeDesc>();
partToPruner.put(part.getName(), pruner_pred);
+
// Put the mapping from table scan operator to part-pruner map
opToPrunner.put(top, partToPruner);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java Wed Dec 5 11:59:15 2012
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.optimizer.PrunerUtils;
@@ -36,17 +35,12 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.session.SessionState;
/**
* The transformation step that does list bucketing pruning.
*
*/
public class ListBucketingPruner implements Transform {
-
- public static final String DEFAULT_SKEWED_DIRECTORY = SessionState.get()
- .getConf().getVar(ConfVars.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME);
- public static final String DEFAULT_SKEWED_KEY = "HIVE_DEFAULT_LIST_BUCKETING_KEY";
static final Log LOG = LogFactory.getLog(ListBucketingPruner.class.getName());
/*
@@ -312,9 +306,12 @@ public class ListBucketingPruner impleme
// Handle skewed value.
if (skewedValues.contains(cell)) { // if it is skewed value
if ((matchResult == null) || matchResult) { // add directory to path unless value is false
- assert (mappings.containsKey(cell)) : "Skewed location mappings doesn't have an entry "
- + "for a skewed value: " + cell;
- selectedPaths.add(new Path(mappings.get(cell)));
+ /* It's valid case if a partition: */
+ /* 1. is defined with skewed columns and skewed values in metadata */
+ /* 2. doesn't have all skewed values within its data */
+ if (mappings.get(cell) != null) {
+ selectedPaths.add(new Path(mappings.get(cell)));
+ }
}
} else {
// Non-skewed value, add it to list for later handle on default directory.
@@ -344,8 +341,10 @@ public class ListBucketingPruner impleme
StringBuilder builder = new StringBuilder();
builder.append(part.getLocation());
builder.append(Path.SEPARATOR);
- builder.append((FileUtils.makeDefaultListBucketingDirName(
- part.getSkewedColNames(), SessionState.get().getConf())));
+ builder
+ .append((FileUtils.makeDefaultListBucketingDirName(
+ part.getSkewedColNames(),
+ ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME)));
selectedPaths.add(new Path(builder.toString()));
}
}
@@ -480,7 +479,7 @@ public class ListBucketingPruner impleme
throws SemanticException {
// Calculate unique skewed elements for each skewed column.
List<List<String>> uniqSkewedElements = DynamicMultiDimensionalCollection.uniqueElementsList(
- values, DEFAULT_SKEWED_KEY);
+ values, ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_KEY);
// Calculate complete dynamic-multi-dimension collection.
return DynamicMultiDimensionalCollection.flat(uniqSkewedElements);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPrunerUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPrunerUtils.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPrunerUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPrunerUtils.java Wed Dec 5 11:59:15 2012
@@ -36,6 +36,12 @@ import org.apache.hadoop.hive.ql.udf.gen
*/
public final class ListBucketingPrunerUtils {
+ /* Default list bucketing directory name. internal use only not for client. */
+ public static String HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME =
+ "HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME";
+ /* Default list bucketing directory key. internal use only not for client. */
+ public static String HIVE_LIST_BUCKETING_DEFAULT_KEY = "HIVE_DEFAULT_LIST_BUCKETING_KEY";
+
/**
* Decide if pruner skips the skewed directory
* Input: if the skewed value matches the expression tree
@@ -280,9 +286,9 @@ public final class ListBucketingPrunerUt
// but, if unknown. not(c=3) will be unknown. we will choose default dir.
// 3 all others, return false
if (cellValueInPosition.equals(constantValueInFilter)
- && !cellValueInPosition.equals(ListBucketingPruner.DEFAULT_SKEWED_KEY)) {
+ && !cellValueInPosition.equals(ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_KEY)) {
result = Boolean.TRUE;
- } else if (cellValueInPosition.equals(ListBucketingPruner.DEFAULT_SKEWED_KEY)
+ } else if (cellValueInPosition.equals(ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_KEY)
&& !uniqSkewedValuesInPosition.contains(constantValueInFilter)) {
result = null;
} else {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java Wed Dec 5 11:59:15 2012
@@ -23,13 +23,15 @@ import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
@Explain(displayName = "Alter Table Partition Merge Files")
public class AlterTablePartMergeFilesDesc {
-
+
private String tableName;
private HashMap<String, String> partSpec;
-
+ private ListBucketingCtx lbCtx; // context for list bucketing.
+
private List<String> inputDir = new ArrayList<String>();
private String outputDir = null;
@@ -73,4 +75,18 @@ public class AlterTablePartMergeFilesDes
this.inputDir = inputDir;
}
+ /**
+ * @return the lbCtx
+ */
+ public ListBucketingCtx getLbCtx() {
+ return lbCtx;
+ }
+
+ /**
+ * @param lbCtx the lbCtx to set
+ */
+ public void setLbCtx(ListBucketingCtx lbCtx) {
+ this.lbCtx = lbCtx;
+ }
+
}