You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/09/13 02:39:28 UTC
svn commit: r1624688 [1/9] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ itests/src/test/resources/
ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/
ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/h...
Author: prasanthj
Date: Sat Sep 13 00:39:26 2014
New Revision: 1624688
URL: http://svn.apache.org/r1624688
Log:
HIVE-7704: Create tez task for fast file merging (Prasanth J, reviewed by Gunther Hagleitner, Vikram Dixit)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OrcFileMergeDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/RCFileMergeDesc.java
hive/trunk/ql/src/test/queries/clientpositive/orc_merge5.q
hive/trunk/ql/src/test/queries/clientpositive/orc_merge6.q
hive/trunk/ql/src/test/queries/clientpositive/orc_merge7.q
hive/trunk/ql/src/test/results/clientpositive/orc_merge5.q.out
hive/trunk/ql/src/test/results/clientpositive/orc_merge6.q.out
hive/trunk/ql/src/test/results/clientpositive/orc_merge7.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge5.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge6.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge7.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge_incompat1.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge_incompat2.q.out
Removed:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileMergeMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/itests/src/test/resources/testconfiguration.properties
hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_8.q
hive/trunk/ql/src/test/queries/clientpositive/orc_merge1.q
hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out
hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out
hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition4.q.out
hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition5.q.out
hive/trunk/ql/src/test/results/clientpositive/orc_createas1.q.out
hive/trunk/ql/src/test/results/clientpositive/orc_merge1.q.out
hive/trunk/ql/src/test/results/clientpositive/orc_merge3.q.out
hive/trunk/ql/src/test/results/clientpositive/rcfile_createas1.q.out
hive/trunk/ql/src/test/results/clientpositive/rcfile_merge1.q.out
hive/trunk/ql/src/test/results/clientpositive/rcfile_merge2.q.out
hive/trunk/ql/src/test/results/clientpositive/rcfile_merge3.q.out
hive/trunk/ql/src/test/results/clientpositive/tez/orc_merge1.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_10.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_11.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_12.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_13.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_14.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_16.q.out
hive/trunk/ql/src/test/results/clientpositive/union_remove_9.q.out
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Sep 13 00:39:26 2014
@@ -803,18 +803,11 @@ public class HiveConf extends Configurat
"map-reduce job to merge the output files into bigger files. This is only done for map-only jobs \n" +
"if hive.merge.mapfiles is true, and for map-reduce jobs if hive.merge.mapredfiles is true."),
HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true, ""),
- HIVEMERGEINPUTFORMATBLOCKLEVEL("hive.merge.input.format.block.level",
- "org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat", ""),
HIVEMERGEORCFILESTRIPELEVEL("hive.merge.orcfile.stripe.level", true,
"When hive.merge.mapfiles or hive.merge.mapredfiles is enabled while writing a\n" +
" table with ORC file format, enabling this config will do stripe level fast merge\n" +
" for small ORC files. Note that enabling this config will not honor padding tolerance\n" +
" config (hive.exec.orc.block.padding.tolerance)."),
- HIVEMERGEINPUTFORMATSTRIPELEVEL("hive.merge.input.format.stripe.level",
- "org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat",
- "Input file format to use for ORC stripe level merging (for internal use only)"),
- HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS(
- "hive.merge.current.job.has.dynamic.partitions", false, ""),
HIVEUSEEXPLICITRCFILEHEADER("hive.exec.rcfile.use.explicit.header", true,
"If this is set the header for RCFiles will simply be RCF. If this is not\n" +
@@ -1676,17 +1669,6 @@ public class HiveConf extends Configurat
" it will now take 512 reducers, similarly if the max number of reducers is 511,\n" +
" and a job was going to use this many, it will now use 256 reducers."),
- /* The following section contains all configurations used for list bucketing feature.*/
- /* This is not for clients. but only for block merge task. */
- /* This is used by BlockMergeTask to send out flag to RCFileMergeMapper */
- /* about alter table...concatenate and list bucketing case. */
- HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING(
- "hive.merge.current.job.concatenate.list.bucketing", true, ""),
- /* This is not for clients. but only for block merge task. */
- /* This is used by BlockMergeTask to send out flag to RCFileMergeMapper */
- /* about depth of list bucketing. */
- HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH(
- "hive.merge.current.job.concatenate.list.bucketing.depth", 0, ""),
HIVEOPTLISTBUCKETING("hive.optimize.listbucketing", false,
"Enable list bucketing optimizer. Default value is false so that we disable it by default."),
Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/trunk/itests/src/test/resources/testconfiguration.properties Sat Sep 13 00:39:26 2014
@@ -96,6 +96,11 @@ minitez.query.files.shared=alter_merge_2
orc_merge2.q,\
orc_merge3.q,\
orc_merge4.q,\
+ orc_merge5.q,\
+ orc_merge6.q,\
+ orc_merge7.q,\
+ orc_merge_incompat1.q,\
+ orc_merge_incompat2.q,\
parallel.q,\
ptf.q,\
sample1.q,\
Modified: hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (original)
+++ hive/trunk/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java Sat Sep 13 00:39:26 2014
@@ -7,10 +7,6 @@
package org.apache.hadoop.hive.ql.plan.api;
-import java.util.Map;
-import java.util.HashMap;
-import org.apache.thrift.TEnum;
-
public enum OperatorType implements org.apache.thrift.TEnum {
JOIN(0),
MAPJOIN(1),
@@ -33,7 +29,9 @@ public enum OperatorType implements org.
PTF(18),
MUX(19),
DEMUX(20),
- EVENT(21);
+ EVENT(21),
+ ORCFILEMERGE(22),
+ RCFILEMERGE(23);
private final int value;
@@ -98,6 +96,10 @@ public enum OperatorType implements org.
return DEMUX;
case 21:
return EVENT;
+ case 22:
+ return ORCFILEMERGE;
+ case 23:
+ return RCFILEMERGE;
default:
return null;
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Fast file merge operator for ORC and RCfile. This is an abstract class which
+ * does not process any rows. Refer {@link org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator}
+ * or {@link org.apache.hadoop.hive.ql.exec.RCFileMergeOperator} for more details.
+ */
+public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
+ extends Operator<T> implements Serializable {
+
+ public static final String BACKUP_PREFIX = "_backup.";
+ public static final Log LOG = LogFactory
+ .getLog(AbstractFileMergeOperator.class);
+
+ protected JobConf jc;
+ protected FileSystem fs;
+ protected boolean autoDelete;
+ protected boolean exception;
+ protected Path outPath;
+ protected Path finalPath;
+ protected Path dpPath;
+ protected Path tmpPath;
+ protected Path taskTmpPath;
+ protected int listBucketingDepth;
+ protected boolean hasDynamicPartitions;
+ protected boolean isListBucketingAlterTableConcatenate;
+ protected boolean tmpPathFixedConcatenate;
+ protected boolean tmpPathFixed;
+ protected Set<Path> incompatFileSet;
+ protected transient DynamicPartitionCtx dpCtx;
+
+ @Override
+ public void initializeOp(Configuration hconf) throws HiveException {
+ super.initializeOp(hconf);
+ this.jc = new JobConf(hconf);
+ incompatFileSet = new HashSet<Path>();
+ autoDelete = false;
+ exception = false;
+ tmpPathFixed = false;
+ tmpPathFixedConcatenate = false;
+ outPath = null;
+ finalPath = null;
+ dpPath = null;
+ tmpPath = null;
+ taskTmpPath = null;
+ dpCtx = conf.getDpCtx();
+ hasDynamicPartitions = conf.hasDynamicPartitions();
+ isListBucketingAlterTableConcatenate = conf
+ .isListBucketingAlterTableConcatenate();
+ listBucketingDepth = conf.getListBucketingDepth();
+ Path specPath = conf.getOutputPath();
+ updatePaths(Utilities.toTempPath(specPath),
+ Utilities.toTaskTempPath(specPath));
+ try {
+ fs = specPath.getFileSystem(hconf);
+ autoDelete = fs.deleteOnExit(outPath);
+ } catch (IOException e) {
+ this.exception = true;
+ throw new HiveException("Failed to initialize AbstractFileMergeOperator",
+ e);
+ }
+ }
+
+ // sets up temp and task temp path
+ private void updatePaths(Path tp, Path ttp) {
+ String taskId = Utilities.getTaskId(jc);
+ tmpPath = tp;
+ taskTmpPath = ttp;
+ finalPath = new Path(tp, taskId);
+ outPath = new Path(ttp, Utilities.toTempPath(taskId));
+ }
+
+ /**
+ * Fixes tmpPath to point to the correct partition. Initialize operator will
+ * set tmpPath and taskTmpPath based on root table directory. So initially,
+ * tmpPath will be <prefix>/_tmp.-ext-10000 and taskTmpPath will be
+ * <prefix>/_task_tmp.-ext-10000. The depth of these two paths will be 0.
+ * Now, in case of dynamic partitioning or list bucketing the inputPath will
+ * have additional sub-directories under root table directory. This function
+ * updates the tmpPath and taskTmpPath to reflect these additional
+ * subdirectories. It updates tmpPath and taskTmpPath in the following way
+ * 1. finds out the difference in path based on depthDiff provided
+ * and saves the path difference in newPath
+ * 2. newPath is used to update the existing tmpPath and taskTmpPath similar
+ * to the way initializeOp() does.
+ *
+ * Note: The path difference between inputPath and tmpDepth can be DP or DP+LB.
+ * This method will automatically handle it.
+ *
+ * Continuing the example above, if inputPath is <prefix>/-ext-10000/hr=a1/,
+ * newPath will be hr=a1/. Then, tmpPath and taskTmpPath will be updated to
+ * <prefix>/-ext-10000/hr=a1/_tmp.ext-10000 and
+ * <prefix>/-ext-10000/hr=a1/_task_tmp.ext-10000 respectively.
+ * We have list_bucket_dml_6.q cover this case: DP + LP + multiple skewed
+ * values + merge.
+ *
+ * @param inputPath - input path
+ * @throws java.io.IOException
+ */
+ protected void fixTmpPath(Path inputPath, int depthDiff) throws IOException {
+
+ // don't need to update tmp paths when there is no depth difference in paths
+ if (depthDiff <=0) {
+ return;
+ }
+
+ dpPath = inputPath;
+ Path newPath = new Path(".");
+
+ // Build the path from bottom up
+ while (inputPath != null && depthDiff > 0) {
+ newPath = new Path(inputPath.getName(), newPath);
+ depthDiff--;
+ inputPath = inputPath.getParent();
+ }
+
+ Path newTmpPath = new Path(tmpPath, newPath);
+ Path newTaskTmpPath = new Path(taskTmpPath, newPath);
+ if (!fs.exists(newTmpPath)) {
+ fs.mkdirs(newTmpPath);
+ }
+ updatePaths(newTmpPath, newTaskTmpPath);
+ }
+
+ /**
+ * Validates that each input path belongs to the same partition since each
+ * mapper merges the input to a single output directory
+ *
+ * @param inputPath - input path
+ */
+ protected void checkPartitionsMatch(Path inputPath) throws IOException {
+ if (!dpPath.equals(inputPath)) {
+ // Temp partition input path does not match exist temp path
+ String msg = "Multiple partitions for one merge mapper: " + dpPath +
+ " NOT EQUAL TO "
+ + inputPath;
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ }
+
+ protected void fixTmpPath(Path path) throws IOException {
+
+ // Fix temp path for alter table ... concatenate
+ if (isListBucketingAlterTableConcatenate) {
+ if (this.tmpPathFixedConcatenate) {
+ checkPartitionsMatch(path);
+ } else {
+ fixTmpPath(path, listBucketingDepth);
+ tmpPathFixedConcatenate = true;
+ }
+ } else {
+ if (hasDynamicPartitions || (listBucketingDepth > 0)) {
+ if (tmpPathFixed) {
+ checkPartitionsMatch(path);
+ } else {
+ // We haven't fixed the TMP path for this mapper yet
+ int depthDiff = path.depth() - tmpPath.depth();
+ fixTmpPath(path, depthDiff);
+ tmpPathFixed = true;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ try {
+ if (!exception) {
+ FileStatus fss = fs.getFileStatus(outPath);
+ if (!fs.rename(outPath, finalPath)) {
+ throw new IOException(
+ "Unable to rename " + outPath + " to " + finalPath);
+ }
+ LOG.info("renamed path " + outPath + " to " + finalPath + " . File" +
+ " size is "
+ + fss.getLen());
+
+ // move any incompatible files to final path
+ if (!incompatFileSet.isEmpty()) {
+ for (Path incompatFile : incompatFileSet) {
+ String fileName = incompatFile.getName();
+ Path destFile = new Path(finalPath.getParent(), fileName);
+ try {
+ Utilities.renameOrMoveFiles(fs, incompatFile, destFile);
+ LOG.info("Moved incompatible file " + incompatFile + " to " +
+ destFile);
+ } catch (HiveException e) {
+ LOG.error("Unable to move " + incompatFile + " to " + destFile);
+ throw new IOException(e);
+ }
+ }
+ }
+ } else {
+ if (!autoDelete) {
+ fs.delete(outPath, true);
+ }
+ }
+ } catch (IOException e) {
+ throw new HiveException("Failed to close AbstractFileMergeOperator", e);
+ }
+ }
+
+ @Override
+ public void jobCloseOp(Configuration hconf, boolean success)
+ throws HiveException {
+ try {
+ Path outputDir = conf.getOutputPath();
+ FileSystem fs = outputDir.getFileSystem(hconf);
+ Path backupPath = backupOutputPath(fs, outputDir);
+ Utilities
+ .mvFileToFinalPath(outputDir, hconf, success, LOG, conf.getDpCtx(),
+ null, reporter);
+ if (success) {
+ LOG.info("jobCloseOp moved merged files to output dir: " + outputDir);
+ }
+ if (backupPath != null) {
+ fs.delete(backupPath, true);
+ }
+ } catch (IOException e) {
+ throw new HiveException("Failed jobCloseOp for AbstractFileMergeOperator",
+ e);
+ }
+ super.jobCloseOp(hconf, success);
+ }
+
+ private Path backupOutputPath(FileSystem fs, Path outpath)
+ throws IOException, HiveException {
+ if (fs.exists(outpath)) {
+ Path backupPath = new Path(outpath.getParent(),
+ BACKUP_PREFIX + outpath.getName());
+ Utilities.rename(fs, outpath, backupPath);
+ return backupPath;
+ } else {
+ return null;
+ }
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Sat Sep 13 00:39:26 2014
@@ -18,32 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
-import static org.apache.commons.lang.StringUtils.join;
-import static org.apache.hadoop.util.StringUtils.stringifyException;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Serializable;
-import java.io.Writer;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -88,8 +62,9 @@ import org.apache.hadoop.hive.ql.QueryPl
import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.merge.MergeTask;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
@@ -133,14 +108,19 @@ import org.apache.hadoop.hive.ql.plan.De
import org.apache.hadoop.hive.ql.plan.DropDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.DropIndexDesc;
import org.apache.hadoop.hive.ql.plan.DropTableDesc;
+import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
import org.apache.hadoop.hive.ql.plan.GrantDesc;
import org.apache.hadoop.hive.ql.plan.GrantRevokeRoleDDL;
+import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.LockTableDesc;
import org.apache.hadoop.hive.ql.plan.MsckDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
import org.apache.hadoop.hive.ql.plan.PrincipalDesc;
import org.apache.hadoop.hive.ql.plan.PrivilegeDesc;
import org.apache.hadoop.hive.ql.plan.PrivilegeObjectDesc;
+import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
import org.apache.hadoop.hive.ql.plan.RevokeDesc;
import org.apache.hadoop.hive.ql.plan.RoleDDLDesc;
@@ -194,6 +174,33 @@ import org.apache.hadoop.util.ToolRunner
import org.apache.hive.common.util.AnnotationUtils;
import org.stringtemplate.v4.ST;
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.Writer;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.hadoop.util.StringUtils.stringifyException;
+
/**
* DDLTask implementation.
*
@@ -546,15 +553,39 @@ public class DDLTask extends Task<DDLWor
*/
private int mergeFiles(Hive db, AlterTablePartMergeFilesDesc mergeFilesDesc)
throws HiveException {
+ ListBucketingCtx lbCtx = mergeFilesDesc.getLbCtx();
+ boolean lbatc = lbCtx == null ? false : lbCtx.isSkewedStoredAsDir();
+ int lbd = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel();
+
// merge work only needs input and output.
- MergeWork mergeWork = new MergeWork(mergeFilesDesc.getInputDir(),
- mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass());
+ MergeFileWork mergeWork = new MergeFileWork(mergeFilesDesc.getInputDir(),
+ mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass().getName());
mergeWork.setListBucketingCtx(mergeFilesDesc.getLbCtx());
mergeWork.resolveConcatenateMerge(db.getConf());
mergeWork.setMapperCannotSpanPartns(true);
- mergeWork.setSourceTableInputFormat(mergeFilesDesc.getInputFormatClass());
+ mergeWork.setSourceTableInputFormat(mergeFilesDesc.getInputFormatClass().getName());
+ final FileMergeDesc fmd;
+ if (mergeFilesDesc.getInputFormatClass().equals(RCFileInputFormat.class)) {
+ fmd = new RCFileMergeDesc();
+ } else {
+ // safe to assume else is ORC as semantic analyzer will check for RC/ORC
+ fmd = new OrcFileMergeDesc();
+ }
+
+ fmd.setDpCtx(null);
+ fmd.setHasDynamicPartitions(false);
+ fmd.setListBucketingAlterTableConcatenate(lbatc);
+ fmd.setListBucketingDepth(lbd);
+ fmd.setOutputPath(mergeFilesDesc.getOutputDir());
+
+ Operator<? extends OperatorDesc> mergeOp = OperatorFactory.get(fmd);
+
+ LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork =
+ new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
+ aliasToWork.put(mergeFilesDesc.getInputDir().toString(), mergeOp);
+ mergeWork.setAliasToWork(aliasToWork);
DriverContext driverCxt = new DriverContext();
- MergeTask taskExec = new MergeTask();
+ MergeFileTask taskExec = new MergeFileTask();
taskExec.initialize(db.getConf(), null, driverCxt);
taskExec.setWork(mergeWork);
taskExec.setQueryPlan(this.getQueryPlan());
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Sat Sep 13 00:39:26 2014
@@ -36,7 +36,7 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.io.merge.MergeTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
@@ -47,7 +47,13 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCol;
import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.plan.*;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
+import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.util.StringUtils;
@@ -55,7 +61,12 @@ import org.apache.hadoop.util.StringUtil
import java.io.IOException;
import java.io.Serializable;
import java.security.AccessControlException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
/**
* MoveTask implementation.
@@ -294,7 +305,7 @@ public class MoveTask extends Task<MoveW
while (task.getParentTasks() != null && task.getParentTasks().size() == 1) {
task = (Task)task.getParentTasks().get(0);
// If it was a merge task or a local map reduce task, nothing can be inferred
- if (task instanceof MergeTask || task instanceof MapredLocalTask) {
+ if (task instanceof MergeFileTask || task instanceof MapredLocalTask) {
break;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Sat Sep 13 00:39:26 2014
@@ -18,10 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
@@ -53,7 +49,9 @@ import org.apache.hadoop.hive.ql.plan.Li
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MuxDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -62,6 +60,10 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.UDTFDesc;
import org.apache.hadoop.hive.ql.plan.UnionDesc;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
/**
* OperatorFactory.
*
@@ -108,6 +110,10 @@ public final class OperatorFactory {
AppMasterEventOperator.class));
opvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
AppMasterEventOperator.class));
+ opvec.add(new OpTuple<RCFileMergeDesc>(RCFileMergeDesc.class,
+ RCFileMergeOperator.class));
+ opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class,
+ OrcFileMergeOperator.class));
}
static {
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileKeyWrapper;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileValueWrapper;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.Writer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.shims.CombineHiveKey;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Fast file merge operator for ORC files.
+ */
+public class OrcFileMergeOperator extends
+ AbstractFileMergeOperator<OrcFileMergeDesc> {
+ public final static Log LOG = LogFactory.getLog("OrcFileMergeOperator");
+
+ // These parameters must match for all orc files involved in merging. If it
+ // does not merge, the file will be put into incompatible file set and will
+ // not be merged.
+ CompressionKind compression = null;
+ long compressBuffSize = 0;
+ List<Integer> version;
+ int columnCount = 0;
+ int rowIndexStride = 0;
+
+ Writer outWriter;
+ Path prevPath;
+ private Reader reader;
+ private FSDataInputStream fdis;
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ Object[] keyValue = (Object[]) row;
+ processKeyValuePairs(keyValue[0], keyValue[1]);
+ }
+
+ private void processKeyValuePairs(Object key, Object value)
+ throws HiveException {
+ try {
+ OrcFileValueWrapper v;
+ OrcFileKeyWrapper k;
+ if (key instanceof CombineHiveKey) {
+ k = (OrcFileKeyWrapper) ((CombineHiveKey) key).getKey();
+ } else {
+ k = (OrcFileKeyWrapper) key;
+ }
+
+ fixTmpPath(k.getInputPath().getParent());
+
+ v = (OrcFileValueWrapper) value;
+
+ if (prevPath == null) {
+ prevPath = k.getInputPath();
+ reader = OrcFile.createReader(fs, k.getInputPath());
+ LOG.info("ORC merge file input path: " + k.getInputPath());
+ }
+
+ // store the orc configuration from the first file. All other files should
+ // match this configuration before merging else will not be merged
+ if (outWriter == null) {
+ compression = k.getCompression();
+ compressBuffSize = k.getCompressBufferSize();
+ version = k.getVersionList();
+ columnCount = k.getTypes().get(0).getSubtypesCount();
+ rowIndexStride = k.getRowIndexStride();
+
+ // block size and stripe size will be from config
+ outWriter = OrcFile.createWriter(outPath,
+ OrcFile.writerOptions(jc).compress(compression)
+ .inspector(reader.getObjectInspector()));
+ LOG.info("ORC merge file output path: " + outPath);
+ }
+
+ if (!checkCompatibility(k)) {
+ incompatFileSet.add(k.getInputPath());
+ return;
+ }
+
+ // next file in the path
+ if (!k.getInputPath().equals(prevPath)) {
+ reader = OrcFile.createReader(fs, k.getInputPath());
+ }
+
+ // initialize buffer to read the entire stripe
+ byte[] buffer = new byte[(int) v.getStripeInformation().getLength()];
+ fdis = fs.open(k.getInputPath());
+ fdis.readFully(v.getStripeInformation().getOffset(), buffer, 0,
+ (int) v.getStripeInformation().getLength());
+
+ // append the stripe buffer to the new ORC file
+ outWriter.appendStripe(buffer, 0, buffer.length, v.getStripeInformation(),
+ v.getStripeStatistics());
+
+ LOG.info("Merged stripe from file " + k.getInputPath() + " [ offset : "
+ + v.getStripeInformation().getOffset() + " length: "
+ + v.getStripeInformation().getLength() + " ]");
+
+ // add user metadata to footer in case of any
+ if (v.isLastStripeInFile()) {
+ outWriter.appendUserMetadata(v.getUserMetadata());
+ }
+ } catch (Throwable e) {
+ this.exception = true;
+ closeOp(true);
+ throw new HiveException(e);
+ }
+ }
+
+ private boolean checkCompatibility(OrcFileKeyWrapper k) {
+ // check compatibility with subsequent files
+ if ((k.getTypes().get(0).getSubtypesCount() != columnCount)) {
+ LOG.info("Incompatible ORC file merge! Column counts does not match for "
+ + k.getInputPath());
+ return false;
+ }
+
+ if (!k.getCompression().equals(compression)) {
+ LOG.info("Incompatible ORC file merge! Compression codec does not match" +
+ " for " + k.getInputPath());
+ return false;
+ }
+
+ if (k.getCompressBufferSize() != compressBuffSize) {
+ LOG.info("Incompatible ORC file merge! Compression buffer size does not" +
+ " match for " + k.getInputPath());
+ return false;
+
+ }
+
+ if (!k.getVersionList().equals(version)) {
+ LOG.info("Incompatible ORC file merge! Version does not match for "
+ + k.getInputPath());
+ return false;
+ }
+
+ if (k.getRowIndexStride() != rowIndexStride) {
+ LOG.info("Incompatible ORC file merge! Row index stride does not match" +
+ " for " + k.getInputPath());
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.ORCFILEMERGE;
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "OFM";
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ // close writer
+ if (outWriter == null) {
+ return;
+ }
+
+ try {
+ if (fdis != null) {
+ fdis.close();
+ fdis = null;
+ }
+
+ outWriter.close();
+ outWriter = null;
+ } catch (IOException e) {
+ throw new HiveException("Unable to close OrcFileMergeOperator", e);
+ }
+ super.closeOp(abort);
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileKeyBufferWrapper;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileValueBufferWrapper;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.hive.shims.CombineHiveKey;
+
+import java.io.IOException;
+
+/**
+ * Fast file merge operator for RC files.
+ */
+public class RCFileMergeOperator
+ extends AbstractFileMergeOperator<RCFileMergeDesc> {
+ public final static Log LOG = LogFactory.getLog("RCFileMergeMapper");
+
+ RCFile.Writer outWriter;
+ CompressionCodec codec = null;
+ int columnNumber = 0;
+
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
+ Object[] keyValue = (Object[]) row;
+ processKeyValuePairs(keyValue[0], keyValue[1]);
+ }
+
+ private void processKeyValuePairs(Object k, Object v)
+ throws HiveException {
+ try {
+
+ RCFileKeyBufferWrapper key;
+ if (k instanceof CombineHiveKey) {
+ key = (RCFileKeyBufferWrapper) ((CombineHiveKey) k).getKey();
+ } else {
+ key = (RCFileKeyBufferWrapper) k;
+ }
+ RCFileValueBufferWrapper value = (RCFileValueBufferWrapper) v;
+
+ fixTmpPath(key.getInputPath().getParent());
+
+ if (outWriter == null) {
+ codec = key.getCodec();
+ columnNumber = key.getKeyBuffer().getColumnNumber();
+ RCFileOutputFormat.setColumnNumber(jc, columnNumber);
+ outWriter = new RCFile.Writer(fs, jc, outPath, null, codec);
+ }
+
+ boolean sameCodec = ((codec == key.getCodec()) || codec.getClass().equals(
+ key.getCodec().getClass()));
+
+ if ((key.getKeyBuffer().getColumnNumber() != columnNumber) ||
+ (!sameCodec)) {
+ throw new IOException( "RCFileMerge failed because the input files" +
+ " use different CompressionCodec or have different column number" +
+ " setting.");
+ }
+
+ outWriter.flushBlock(key.getKeyBuffer(), value.getValueBuffer(),
+ key.getRecordLength(), key.getKeyLength(),
+ key.getCompressedKeyLength());
+ } catch (Throwable e) {
+ this.exception = true;
+ closeOp(true);
+ throw new HiveException(e);
+ }
+ }
+
+ @Override
+ public void closeOp(boolean abort) throws HiveException {
+ // close writer
+ if (outWriter == null) {
+ return;
+ }
+
+ try {
+ outWriter.close();
+ } catch (IOException e) {
+ throw new HiveException("Unable to close RCFileMergeOperator", e);
+ }
+ outWriter = null;
+
+ super.closeOp(abort);
+ }
+
+ @Override
+ public OperatorType getType() {
+ return OperatorType.RCFILEMERGE;
+ }
+
+ /**
+ * @return the name of the operator
+ */
+ @Override
+ public String getName() {
+ return getOperatorName();
+ }
+
+ static public String getOperatorName() {
+ return "RFM";
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Sat Sep 13 00:39:26 2014
@@ -28,8 +28,8 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
-import org.apache.hadoop.hive.ql.io.merge.MergeTask;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
@@ -94,8 +94,8 @@ public final class TaskFactory {
taskvec.add(new TaskTuple<StatsNoJobWork>(StatsNoJobWork.class, StatsNoJobTask.class));
taskvec.add(new TaskTuple<ColumnStatsWork>(ColumnStatsWork.class, ColumnStatsTask.class));
taskvec.add(new TaskTuple<ColumnStatsUpdateWork>(ColumnStatsUpdateWork.class, ColumnStatsUpdateTask.class));
- taskvec.add(new TaskTuple<MergeWork>(MergeWork.class,
- MergeTask.class));
+ taskvec.add(new TaskTuple<MergeFileWork>(MergeFileWork.class,
+ MergeFileTask.class));
taskvec.add(new TaskTuple<DependencyCollectionWork>(DependencyCollectionWork.class,
DependencyCollectionTask.class));
taskvec.add(new TaskTuple<PartialScanWork>(PartialScanWork.class,
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat Sep 13 00:39:26 2014
@@ -18,67 +18,11 @@
package org.apache.hadoop.hive.ql.exec;
-import java.beans.DefaultPersistenceDelegate;
-import java.beans.Encoder;
-import java.beans.ExceptionListener;
-import java.beans.Expression;
-import java.beans.PersistenceDelegate;
-import java.beans.Statement;
-import java.beans.XMLDecoder;
-import java.beans.XMLEncoder;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.SQLTransientException;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
-
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
import org.antlr.runtime.CommonToken;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
@@ -122,9 +66,8 @@ import org.apache.hadoop.hive.ql.io.Hive
import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
-import org.apache.hadoop.hive.ql.io.orc.OrcFileMergeMapper;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper;
@@ -181,11 +124,66 @@ import org.apache.hadoop.util.Progressab
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
+import java.beans.DefaultPersistenceDelegate;
+import java.beans.Encoder;
+import java.beans.ExceptionListener;
+import java.beans.Expression;
+import java.beans.PersistenceDelegate;
+import java.beans.Statement;
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLTransientException;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
/**
* Utilities.
@@ -352,9 +350,8 @@ public final class Utilities {
if(MAP_PLAN_NAME.equals(name)){
if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){
gWork = deserializePlan(in, MapWork.class, conf);
- } else if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS)) ||
- OrcFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
- gWork = deserializePlan(in, MergeWork.class, conf);
+ } else if(MergeFileMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
+ gWork = deserializePlan(in, MergeFileWork.class, conf);
} else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
gWork = deserializePlan(in, ColumnTruncateWork.class, conf);
} else if(PartialScanMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Sat Sep 13 00:39:26 2014
@@ -17,22 +17,9 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.auth.login.LoginException;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -54,6 +41,9 @@ import org.apache.hadoop.hive.ql.io.Comb
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -68,6 +58,7 @@ import org.apache.hadoop.hive.shims.Hado
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
@@ -113,9 +104,20 @@ import org.apache.tez.runtime.library.co
import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
+import javax.security.auth.login.LoginException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
/**
* DagUtils. DagUtils is a collection of helper methods to convert
@@ -212,6 +214,16 @@ public class DagUtils {
conf.set("mapred.mapper.class", ExecMapper.class.getName());
conf.set("mapred.input.format.class", inpFormat);
+ if (mapWork instanceof MergeFileWork) {
+ MergeFileWork mfWork = (MergeFileWork) mapWork;
+ // This mapper class is used for serializaiton/deserializaiton of merge
+ // file work.
+ conf.set("mapred.mapper.class", MergeFileMapper.class.getName());
+ conf.set("mapred.input.format.class", mfWork.getInputformat());
+ conf.setClass("mapred.output.format.class", MergeFileOutputFormat.class,
+ FileOutputFormat.class);
+ }
+
return conf;
}
@@ -486,6 +498,21 @@ public class DagUtils {
}
}
+ if (mapWork instanceof MergeFileWork) {
+ Path outputPath = ((MergeFileWork) mapWork).getOutputDir();
+ // prepare the tmp output directory. The output tmp directory should
+ // exist before jobClose (before renaming after job completion)
+ Path tempOutPath = Utilities.toTempPath(outputPath);
+ try {
+ if (!fs.exists(tempOutPath)) {
+ fs.mkdirs(tempOutPath);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Can't make path " + outputPath + " : " + e.getMessage());
+ }
+ }
+
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
&& !mapWork.isUseOneNullRowInputFormat()) {
@@ -515,9 +542,13 @@ public class DagUtils {
}
UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf);
- map = Vertex.create(mapWork.getName(),
- ProcessorDescriptor.create(MapTezProcessor.class.getName()).
- setUserPayload(serializedConf), numTasks, getContainerResource(conf));
+ String procClassName = MapTezProcessor.class.getName();
+ if (mapWork instanceof MergeFileWork) {
+ procClassName = MergeFileTezProcessor.class.getName();
+ }
+ map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName)
+ .setUserPayload(serializedConf), numTasks, getContainerResource(conf));
+
map.setTaskEnvironment(getContainerEnvironment(conf, true));
map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
@@ -784,7 +815,7 @@ public class DagUtils {
}
/**
- * @param path - the path from which we try to determine the resource base name
+ * @param path - the string from which we try to determine the resource base name
* @return the name of the resource from a given path string.
*/
public String getResourceBaseName(Path path) {
@@ -831,7 +862,8 @@ public class DagUtils {
conf.getInt(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.varname,
HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.defaultIntVal);
long sleepInterval = HiveConf.getTimeVar(
- conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
+ conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL,
+ TimeUnit.MILLISECONDS);
LOG.info("Number of wait attempts: " + waitAttempts + ". Wait interval: "
+ sleepInterval);
boolean found = false;
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+import java.util.Map;
+
+/**
+ * Record processor for fast merging of files.
+ */
+public class MergeFileRecordProcessor extends RecordProcessor {
+
+ public static final Log LOG = LogFactory
+ .getLog(MergeFileRecordProcessor.class);
+
+ protected Operator<? extends OperatorDesc> mergeOp;
+ private final ExecMapperContext execContext = new ExecMapperContext();
+ protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
+ private MergeFileWork mfWork;
+ private boolean abort = false;
+ private Object[] row = new Object[2];
+
+ @Override
+ void init(JobConf jconf, ProcessorContext processorContext,
+ MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
+ Map<String, LogicalOutput> outputs) throws Exception {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
+ super.init(jconf, processorContext, mrReporter, inputs, outputs);
+
+ //Update JobConf using MRInput, info like filename comes via this
+ MRInputLegacy mrInput = TezProcessor.getMRInput(inputs);
+ Configuration updatedConf = mrInput.getConfigUpdates();
+ if (updatedConf != null) {
+ for (Map.Entry<String, String> entry : updatedConf) {
+ jconf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ createOutputMap();
+ // Start all the Outputs.
+ for (Map.Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
+ outputEntry.getValue().start();
+ ((TezProcessor.TezKVOutputCollector) outMap.get(outputEntry.getKey()))
+ .initialize();
+ }
+
+ org.apache.hadoop.hive.ql.exec.ObjectCache cache = ObjectCacheFactory
+ .getCache(jconf);
+ try {
+ execContext.setJc(jconf);
+ // create map and fetch operators
+ MapWork mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
+ if (mapWork == null) {
+ mapWork = Utilities.getMapWork(jconf);
+ if (mapWork instanceof MergeFileWork) {
+ mfWork = (MergeFileWork) mapWork;
+ } else {
+ throw new RuntimeException("MapWork should be an instance of" +
+ " MergeFileWork.");
+ }
+ cache.cache(MAP_PLAN_KEY, mapWork);
+ } else {
+ Utilities.setMapWork(jconf, mapWork);
+ }
+
+ String alias = mfWork.getAliasToWork().keySet().iterator().next();
+ mergeOp = mfWork.getAliasToWork().get(alias);
+ LOG.info(mergeOp.dump(0));
+
+ MapredContext.init(true, new JobConf(jconf));
+ ((TezContext) MapredContext.get()).setInputs(inputs);
+ mergeOp.setExecContext(execContext);
+ mergeOp.initializeLocalWork(jconf);
+ mergeOp.initialize(jconf, null);
+
+ OperatorUtils.setChildrenCollector(mergeOp.getChildOperators(), outMap);
+ mergeOp.setReporter(reporter);
+ MapredContext.get().setReporter(reporter);
+ } catch (Throwable e) {
+ if (e instanceof OutOfMemoryError) {
+ // will this be true here?
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
+ } else {
+ throw new RuntimeException("Map operator initialization failed", e);
+ }
+ }
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
+ }
+
+ @Override
+ void run() throws Exception {
+ MRInputLegacy in = TezProcessor.getMRInput(inputs);
+ KeyValueReader reader = in.getReader();
+
+ //process records until done
+ while (reader.next()) {
+ boolean needMore = processRow(reader.getCurrentKey(),
+ reader.getCurrentValue());
+ if (!needMore) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ void close() {
+ // check if there are IOExceptions
+ if (!abort) {
+ abort = execContext.getIoCxt().getIOExceptions();
+ }
+
+ // detecting failed executions by exceptions thrown by the operator tree
+ try {
+ if (mergeOp == null || mfWork == null) {
+ return;
+ }
+ mergeOp.close(abort);
+
+ if (isLogInfoEnabled) {
+ logCloseInfo();
+ }
+ ExecMapper.ReportStats rps = new ExecMapper.ReportStats(reporter);
+ mergeOp.preorderMap(rps);
+ } catch (Exception e) {
+ if (!abort) {
+ // signal new failure to map-reduce
+ l4j.error("Hit error while closing operators - failing tree");
+ throw new RuntimeException("Hive Runtime Error while closing operators",
+ e);
+ }
+ } finally {
+ Utilities.clearWorkMap();
+ MapredContext.close();
+ }
+ }
+
+ /**
+ * @param key key to process
+ * @param value value to process
+ * @return true if it is not done and can take more inputs
+ */
+ private boolean processRow(Object key, Object value) {
+ // reset the execContext for each new row
+ execContext.resetRow();
+
+ try {
+ if (mergeOp.getDone()) {
+ return false; //done
+ } else {
+ row[0] = key;
+ row[1] = value;
+ mergeOp.processOp(row, 0);
+ if (isLogInfoEnabled) {
+ logProgress();
+ }
+ }
+ } catch (Throwable e) {
+ abort = true;
+ if (e instanceof OutOfMemoryError) {
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
+ } else {
+ l4j.fatal(StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+ return true; //give me more
+ }
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileTezProcessor.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.ProcessorContext;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Tez processor for fast file merging. This is same as TezProcessor except it
+ * has different record processor.
+ */
+public class MergeFileTezProcessor extends TezProcessor {
+
+ public MergeFileTezProcessor(ProcessorContext context) {
+ super(context);
+ }
+
+ @Override
+ public void run(Map<String, LogicalInput> inputs,
+ Map<String, LogicalOutput> outputs) throws Exception {
+ rproc = new MergeFileRecordProcessor();
+ MRInputLegacy mrInput = getMRInput(inputs);
+ try {
+ mrInput.init();
+ } catch (IOException e) {
+ throw new RuntimeException("Failed while initializing MRInput", e);
+ }
+ initializeAndRunProcessor(inputs, outputs);
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Sat Sep 13 00:39:26 2014
@@ -16,13 +16,8 @@
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.net.URLClassLoader;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Map.Entry;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
@@ -34,8 +29,12 @@ import org.apache.tez.runtime.api.Logica
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Map.Entry;
/**
* Process input from tez LogicalInput and write output
@@ -66,7 +65,7 @@ public abstract class RecordProcessor {
/**
* Common initialization code for RecordProcessors
* @param jconf
- * @param processorContext the {@link TezProcessorContext}
+ * @param processorContext the {@link ProcessorContext}
* @param mrReporter
* @param inputs map of Input names to {@link LogicalInput}s
* @param outputs map of Output names to {@link LogicalOutput}s
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Sat Sep 13 00:39:26 2014
@@ -17,12 +17,6 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -40,6 +34,11 @@ import org.apache.tez.runtime.api.Logica
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.List;
+import java.util.Map;
+
/**
* Hive processor for Tez that forms the vertices in Tez and processes the data.
* Does what ExecMapper and ExecReducer does for hive in MR framework.
@@ -51,13 +50,15 @@ public class TezProcessor extends Abstra
private static final Log LOG = LogFactory.getLog(TezProcessor.class);
protected boolean isMap = false;
- RecordProcessor rproc = null;
+ protected RecordProcessor rproc = null;
- private JobConf jobConf;
+ protected JobConf jobConf;
private static final String CLASS_NAME = TezProcessor.class.getName();
private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ protected ProcessorContext processorContext;
+
protected static final NumberFormat taskIdFormat = NumberFormat.getInstance();
protected static final NumberFormat jobIdFormat = NumberFormat.getInstance();
static {
@@ -121,9 +122,6 @@ public class TezProcessor extends Abstra
public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
throws Exception {
- Throwable originalThrowable = null;
-
- try{
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
// in case of broadcast-join read the broadcast edge inputs
// (possibly asynchronously)
@@ -142,14 +140,23 @@ public class TezProcessor extends Abstra
rproc = new ReduceRecordProcessor();
}
+ initializeAndRunProcessor(inputs, outputs);
+ }
+
+ protected void initializeAndRunProcessor(Map<String, LogicalInput> inputs,
+ Map<String, LogicalOutput> outputs)
+ throws Exception {
+ Throwable originalThrowable = null;
+ try {
TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
// Start the actual Inputs. After MRInput initialization.
- for (Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
+ for (Map.Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
if (!cacheAccess.isInputCached(inputEntry.getKey())) {
LOG.info("Input: " + inputEntry.getKey() + " is not cached");
inputEntry.getValue().start();
} else {
- LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+ LOG.info("Input: " + inputEntry.getKey() +
+ " is already cached. Skipping start");
}
}
@@ -170,7 +177,7 @@ public class TezProcessor extends Abstra
}
try {
- if(rproc != null){
+ if (rproc != null) {
rproc.close();
}
} catch (Throwable t) {
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileInputFormat.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileInputFormat.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileInputFormat.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public abstract class MergeFileInputFormat extends FileInputFormat {
+
+ @Override
+ public abstract RecordReader getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException;
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileMapper.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+
+/**
+ * Mapper for fast file merging of ORC and RC files. This is very similar to
+ * ExecMapper except that root operator is AbstractFileMergeOperator. This class
+ * name is used for serialization and deserialization of MergeFileWork.
+ */
+public class MergeFileMapper extends MapReduceBase implements Mapper {
+ public static final Log LOG = LogFactory.getLog("MergeFileMapper");
+ private static final String PLAN_KEY = "__MAP_PLAN__";
+
+ private JobConf jc;
+ private Operator<? extends OperatorDesc> op;
+ private AbstractFileMergeOperator mergeOp;
+ private Object[] row;
+ private boolean abort;
+
+ @Override
+ public void configure(JobConf job) {
+ jc = job;
+ ObjectCache cache = ObjectCacheFactory.getCache(job);
+ MapWork mapWork = (MapWork) cache.retrieve(PLAN_KEY);
+
+ // if map work is found in object cache then return it else retrieve the
+ // plan from filesystem and cache it
+ if (mapWork == null) {
+ mapWork = Utilities.getMapWork(job);
+ cache.cache(PLAN_KEY, mapWork);
+ } else {
+ Utilities.setMapWork(job, mapWork);
+ }
+
+ try {
+ if (mapWork instanceof MergeFileWork) {
+ MergeFileWork mfWork = (MergeFileWork) mapWork;
+ String alias = mfWork.getAliasToWork().keySet().iterator().next();
+ op = mfWork.getAliasToWork().get(alias);
+ if (op instanceof AbstractFileMergeOperator) {
+ mergeOp = (AbstractFileMergeOperator) op;
+ mergeOp.initializeOp(jc);
+ row = new Object[2];
+ abort = false;
+ } else {
+ abort = true;
+ throw new RuntimeException(
+ "Merge file work's top operator should be an" +
+ " instance of AbstractFileMergeOperator");
+ }
+ } else {
+ abort = true;
+ throw new RuntimeException("Map work should be a merge file work.");
+ }
+ } catch (HiveException e) {
+ abort = true;
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ mergeOp.closeOp(abort);
+ } catch (HiveException e) {
+ throw new IOException(e);
+ }
+ super.close();
+ }
+
+ @Override
+ public void map(Object key, Object value, OutputCollector output,
+ Reporter reporter) throws IOException {
+
+ row[0] = key;
+ row[1] = value;
+ try {
+ mergeOp.processOp(row, 0);
+ } catch (HiveException e) {
+ abort = true;
+ throw new IOException(e);
+ }
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileOutputFormat.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileOutputFormat.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileOutputFormat.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+
+public class MergeFileOutputFormat extends
+ FileOutputFormat<Object, Object> {
+
+ @Override
+ public RecordWriter<Object, Object> getRecordWriter(FileSystem ignored, JobConf job, String name,
+ Progressable progress) throws IOException {
+ return new RecordWriter<Object, Object>() {
+ public void write(Object key, Object value) {
+ throw new RuntimeException("Should not be called");
+ }
+
+ public void close(Reporter reporter) {
+ }
+ };
+ }
+
+}
\ No newline at end of file