You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/09/13 02:39:28 UTC
svn commit: r1624688 [2/9] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ itests/src/test/resources/
ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/
ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/h...
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper;
+import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHook;
+import org.apache.hadoop.hive.ql.exec.mr.Throttle;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Task for fast merging of ORC and RC files.
+ */
+public class MergeFileTask extends Task<MergeFileWork> implements Serializable,
+ HadoopJobExecHook {
+
+ private transient JobConf job;
+ private HadoopJobExecHelper jobExecHelper;
+ private boolean success = true;
+
+ @Override
+ public void initialize(HiveConf conf, QueryPlan queryPlan,
+ DriverContext driverContext) {
+ super.initialize(conf, queryPlan, driverContext);
+ job = new JobConf(conf, MergeFileTask.class);
+ jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this);
+ }
+
+ @Override
+ public boolean requireLock() {
+ return true;
+ }
+
+ /**
+ * start a new map-reduce job to do the merge, almost the same as ExecDriver.
+ */
+ @Override
+ public int execute(DriverContext driverContext) {
+
+ Context ctx = driverContext.getCtx();
+ boolean ctxCreated = false;
+ RunningJob rj = null;
+ int returnVal = 0;
+
+ try {
+ if (ctx == null) {
+ ctx = new Context(job);
+ ctxCreated = true;
+ }
+
+ ShimLoader.getHadoopShims().prepareJobOutput(job);
+ job.setInputFormat(work.getInputformatClass());
+ job.setOutputFormat(HiveOutputFormatImpl.class);
+ job.setMapperClass(MergeFileMapper.class);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+ job.setNumReduceTasks(0);
+
+ // create the temp directories
+ Path outputPath = work.getOutputDir();
+ Path tempOutPath = Utilities.toTempPath(outputPath);
+ FileSystem fs = tempOutPath.getFileSystem(job);
+ if (!fs.exists(tempOutPath)) {
+ fs.mkdirs(tempOutPath);
+ }
+
+ // set job name
+ boolean noName = StringUtils.isEmpty(HiveConf.getVar(job,
+ HiveConf.ConfVars.HADOOPJOBNAME));
+
+ String jobName = null;
+ if (noName && this.getQueryPlan() != null) {
+ int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
+ jobName = Utilities.abbreviate(this.getQueryPlan().getQueryStr(),
+ maxlen - 6);
+ }
+
+ if (noName) {
+ // This is for a special case to ensure unit tests pass
+ HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME,
+ jobName != null ? jobName : "JOB" + Utilities.randGen.nextInt());
+ }
+
+ // add input path
+ addInputPaths(job, work);
+
+ // serialize work
+ Utilities.setMapWork(job, work, ctx.getMRTmpPath(), true);
+
+ // remove pwd from conf file so that job tracker doesn't show this logs
+ String pwd = HiveConf.getVar(job, HiveConf.ConfVars.METASTOREPWD);
+ if (pwd != null) {
+ HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, "HIVE");
+ }
+
+ // submit the job
+ JobClient jc = new JobClient(job);
+
+ String addedJars = Utilities.getResourceFiles(job,
+ SessionState.ResourceType.JAR);
+ if (!addedJars.isEmpty()) {
+ job.set("tmpjars", addedJars);
+ }
+
+ // make this client wait if job trcker is not behaving well.
+ Throttle.checkJobTracker(job, LOG);
+
+ // Finally SUBMIT the JOB!
+ rj = jc.submitJob(job);
+
+ returnVal = jobExecHelper.progress(rj, jc, null);
+ success = (returnVal == 0);
+
+ } catch (Exception e) {
+ String mesg = " with exception '" + Utilities.getNameMessage(e) + "'";
+ if (rj != null) {
+ mesg = "Ended Job = " + rj.getJobID() + mesg;
+ } else {
+ mesg = "Job Submission failed" + mesg;
+ }
+
+ // Has to use full name to make sure it does not conflict with
+ // org.apache.commons.lang.StringUtils
+ console.printError(mesg, "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
+
+ success = false;
+ returnVal = 1;
+ } finally {
+ try {
+ if (ctxCreated) {
+ ctx.clear();
+ }
+ if (rj != null) {
+ if (returnVal != 0) {
+ rj.killJob();
+ }
+ HadoopJobExecHelper.runningJobs.remove(rj);
+ jobID = rj.getID().toString();
+ }
+ // get the list of Dynamic partition paths
+ if (rj != null) {
+ if (work.getAliasToWork() != null) {
+ for (Operator<? extends OperatorDesc> op : work.getAliasToWork()
+ .values()) {
+ op.jobClose(job, success);
+ }
+ }
+ }
+ } catch (Exception e) {
+ // jobClose needs to execute successfully otherwise fail task
+ if (success) {
+ success = false;
+ returnVal = 3;
+ String mesg = "Job Commit failed with exception '" +
+ Utilities.getNameMessage(e) + "'";
+ console.printError(mesg, "\n" +
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ }
+ }
+
+ return returnVal;
+ }
+
+ private void addInputPaths(JobConf job, MergeFileWork work) {
+ for (Path path : work.getInputPaths()) {
+ FileInputFormat.addInputPath(job, path);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "MergeFileTask";
+ }
+
+ @Override
+ public StageType getType() {
+ return StageType.MAPRED;
+ }
+
+ @Override
+ public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
+ return false;
+ }
+
+ @Override
+ public void logPlanProgress(SessionState ss) throws IOException {
+ // no op
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileWork.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.merge;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.mapred.InputFormat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+@Explain(displayName = "Merge File Operator")
+public class MergeFileWork extends MapWork {
+
+ private static final Log LOG = LogFactory.getLog(MergeFileWork.class);
+ private List<Path> inputPaths;
+ private Path outputDir;
+ private boolean hasDynamicPartitions;
+ private boolean isListBucketingAlterTableConcatenate;
+ private ListBucketingCtx listBucketingCtx;
+
+ // source table input format
+ private String srcTblInputFormat;
+
+ // internal input format used by CombineHiveInputFormat
+ private Class<? extends InputFormat> internalInputFormat;
+
+ public MergeFileWork(List<Path> inputPaths, Path outputDir,
+ String srcTblInputFormat) {
+ this(inputPaths, outputDir, false, srcTblInputFormat);
+ }
+
+ public MergeFileWork(List<Path> inputPaths, Path outputDir,
+ boolean hasDynamicPartitions,
+ String srcTblInputFormat) {
+ this.inputPaths = inputPaths;
+ this.outputDir = outputDir;
+ this.hasDynamicPartitions = hasDynamicPartitions;
+ this.srcTblInputFormat = srcTblInputFormat;
+ PartitionDesc partDesc = new PartitionDesc();
+ if (srcTblInputFormat.equals(OrcInputFormat.class.getName())) {
+ this.internalInputFormat = OrcFileStripeMergeInputFormat.class;
+ } else if (srcTblInputFormat.equals(RCFileInputFormat.class.getName())) {
+ this.internalInputFormat = RCFileBlockMergeInputFormat.class;
+ }
+ partDesc.setInputFileFormatClass(internalInputFormat);
+ if (this.getPathToPartitionInfo() == null) {
+ this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
+ }
+ for (Path path : this.inputPaths) {
+ this.getPathToPartitionInfo().put(path.toString(), partDesc);
+ }
+ this.isListBucketingAlterTableConcatenate = false;
+ }
+
+ public List<Path> getInputPaths() {
+ return inputPaths;
+ }
+
+ public void setInputPaths(List<Path> inputPaths) {
+ this.inputPaths = inputPaths;
+ }
+
+ public Path getOutputDir() {
+ return outputDir;
+ }
+
+ public void setOutputDir(Path outputDir) {
+ this.outputDir = outputDir;
+ }
+
+ @Override
+ public Long getMinSplitSize() {
+ return null;
+ }
+
+ @Override
+ public String getInputformat() {
+ return getInputformatClass().getName();
+ }
+
+ public Class<? extends InputFormat> getInputformatClass() {
+ return CombineHiveInputFormat.class;
+ }
+
+ @Override
+ public boolean isGatheringStats() {
+ return false;
+ }
+
+ public boolean hasDynamicPartitions() {
+ return this.hasDynamicPartitions;
+ }
+
+ public void setHasDynamicPartitions(boolean hasDynamicPartitions) {
+ this.hasDynamicPartitions = hasDynamicPartitions;
+ }
+
+ @Override
+ public void resolveDynamicPartitionStoredAsSubDirsMerge(HiveConf conf,
+ Path path,
+ TableDesc tblDesc,
+ ArrayList<String> aliases,
+ PartitionDesc partDesc) {
+ super.resolveDynamicPartitionStoredAsSubDirsMerge(conf, path, tblDesc,
+ aliases, partDesc);
+ // set internal input format for all partition descriptors
+ partDesc.setInputFileFormatClass(internalInputFormat);
+ // Add the DP path to the list of input paths
+ inputPaths.add(path);
+ }
+
+ /**
+ * alter table ... concatenate
+ * <p/>
+ * If it is skewed table, use subdirectories in inputpaths.
+ */
+ public void resolveConcatenateMerge(HiveConf conf) {
+ isListBucketingAlterTableConcatenate =
+ ((listBucketingCtx == null) ? false : listBucketingCtx
+ .isSkewedStoredAsDir());
+ LOG.info("isListBucketingAlterTableConcatenate : " +
+ isListBucketingAlterTableConcatenate);
+ if (isListBucketingAlterTableConcatenate) {
+ // use sub-dir as inputpath.
+ assert ((this.inputPaths != null) && (this.inputPaths.size() == 1)) :
+ "alter table ... concatenate should only have one" +
+ " directory inside inputpaths";
+ Path dirPath = inputPaths.get(0);
+ try {
+ FileSystem inpFs = dirPath.getFileSystem(conf);
+ FileStatus[] status =
+ HiveStatsUtils.getFileStatusRecurse(dirPath, listBucketingCtx
+ .getSkewedColNames().size(), inpFs);
+ List<Path> newInputPath = new ArrayList<Path>();
+ boolean succeed = true;
+ for (int i = 0; i < status.length; ++i) {
+ if (status[i].isDir()) {
+ // Add the lb path to the list of input paths
+ newInputPath.add(status[i].getPath());
+ } else {
+ // find file instead of dir. dont change inputpath
+ succeed = false;
+ }
+ }
+ assert (succeed || ((!succeed) && newInputPath.isEmpty())) :
+ "This partition has "
+ + " inconsistent file structure: "
+ +
+ "it is stored-as-subdir and expected all files in the same depth"
+ + " of subdirectories.";
+ if (succeed) {
+ inputPaths.clear();
+ inputPaths.addAll(newInputPath);
+ }
+ } catch (IOException e) {
+ String msg =
+ "Fail to get filesystem for directory name : " + dirPath.toUri();
+ throw new RuntimeException(msg, e);
+ }
+
+ }
+ }
+
+ /**
+ * @return the listBucketingCtx
+ */
+ public ListBucketingCtx getListBucketingCtx() {
+ return listBucketingCtx;
+ }
+
+ /**
+ * @param listBucketingCtx the listBucketingCtx to set
+ */
+ public void setListBucketingCtx(ListBucketingCtx listBucketingCtx) {
+ this.listBucketingCtx = listBucketingCtx;
+ }
+
+ /**
+ * @return the isListBucketingAlterTableConcatenate
+ */
+ public boolean isListBucketingAlterTableConcatenate() {
+ return isListBucketingAlterTableConcatenate;
+ }
+
+ @Explain(displayName = "input format")
+ public String getSourceTableInputFormat() {
+ return srcTblInputFormat;
+ }
+
+ public void setSourceTableInputFormat(String srcTblInputFormat) {
+ this.srcTblInputFormat = srcTblInputFormat;
+ }
+
+ @Explain(displayName = "merge level")
+ public String getMergeLevel() {
+ if (srcTblInputFormat != null) {
+ if (srcTblInputFormat.equals(OrcInputFormat.class.getName())) {
+ return "stripe";
+ } else if (srcTblInputFormat.equals(RCFileInputFormat.class.getName())) {
+ return "block";
+ }
+ }
+ return null;
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java Sat Sep 13 00:39:26 2014
@@ -18,16 +18,16 @@
package org.apache.hadoop.hive.ql.io.orc;
-import java.io.IOException;
-
-import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-public class OrcFileStripeMergeInputFormat extends MergeInputFormat {
+import java.io.IOException;
+
+public class OrcFileStripeMergeInputFormat extends MergeFileInputFormat {
@Override
public RecordReader<OrcFileKeyWrapper, OrcFileValueWrapper> getRecordReader(
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java Sat Sep 13 00:39:26 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
/**
* The interface for writing ORC files.
@@ -72,4 +73,30 @@ public interface Writer {
* @return the offset that would be a valid end location for an ORC file
*/
long writeIntermediateFooter() throws IOException;
+
+ /**
+ * Fast stripe append to ORC file. This interface is used for fast ORC file
+ * merge with other ORC files. When merging, the file to be merged should pass
+ * stripe in binary form along with stripe information and stripe statistics.
+ * After appending last stripe of a file, use appendUserMetadata() to append
+ * any user metadata.
+ * @param stripe - stripe as byte array
+ * @param offset - offset within byte array
+ * @param length - length of stripe within byte array
+ * @param stripeInfo - stripe information
+ * @param stripeStatistics - stripe statistics (Protobuf objects can be
+ * merged directly)
+ * @throws IOException
+ */
+ public void appendStripe(byte[] stripe, int offset, int length,
+ StripeInformation stripeInfo,
+ OrcProto.StripeStatistics stripeStatistics) throws IOException;
+
+ /**
+ * When fast stripe append is used for merging ORC stripes, after appending
+ * the last stripe from a file, this interface must be used to merge any
+ * user metadata.
+ * @param userMetadata - user metadata
+ */
+ public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Sat Sep 13 00:39:26 2014
@@ -29,6 +29,10 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -74,10 +78,17 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static com.google.common.base.Preconditions.checkArgument;
/**
* An ORC file writer. The file is divided into stripes, which is the natural
@@ -2316,17 +2327,19 @@ class WriterImpl implements Writer, Memo
return rawWriter.getPos();
}
- void appendStripe(byte[] stripe, StripeInformation stripeInfo,
- OrcProto.StripeStatistics stripeStatistics) throws IOException {
- appendStripe(stripe, 0, stripe.length, stripeInfo, stripeStatistics);
- }
-
- void appendStripe(byte[] stripe, int offset, int length,
+ @Override
+ public void appendStripe(byte[] stripe, int offset, int length,
StripeInformation stripeInfo,
OrcProto.StripeStatistics stripeStatistics) throws IOException {
+ checkArgument(stripe != null, "Stripe must not be null");
+ checkArgument(length <= stripe.length,
+ "Specified length must not be greater specified array length");
+ checkArgument(stripeInfo != null, "Stripe information must not be null");
+ checkArgument(stripeStatistics != null,
+ "Stripe statistics must not be null");
+
getStream();
long start = rawWriter.getPos();
-
long stripeLen = length;
long availBlockSpace = blockSize - (start % blockSize);
@@ -2382,7 +2395,8 @@ class WriterImpl implements Writer, Memo
}
}
- void appendUserMetadata(List<UserMetadataItem> userMetadata) {
+ @Override
+ public void appendUserMetadata(List<UserMetadataItem> userMetadata) {
if (userMetadata != null) {
for (UserMetadataItem item : userMetadata) {
this.userMetadata.put(item.getName(), item.getValue());
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java Sat Sep 13 00:39:26 2014
@@ -20,14 +20,14 @@ package org.apache.hadoop.hive.ql.io.rcf
import java.io.IOException;
-import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-public class RCFileBlockMergeInputFormat extends MergeInputFormat {
+public class RCFileBlockMergeInputFormat extends MergeFileInputFormat {
@Override
public RecordReader<RCFileKeyBufferWrapper, RCFileValueBufferWrapper>
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Sat Sep 13 00:39:26 2014
@@ -18,20 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -53,6 +39,8 @@ import org.apache.hadoop.hive.ql.exec.No
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator;
+import org.apache.hadoop.hive.ql.exec.RCFileMergeOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
@@ -65,8 +53,10 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
@@ -88,6 +78,7 @@ import org.apache.hadoop.hive.ql.plan.Co
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
@@ -96,8 +87,10 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
@@ -106,6 +99,22 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
/**
* General utility common functions for the Processor to convert operator into
@@ -1250,33 +1259,20 @@ public final class GenMapRedUtils {
(conf.getBoolVar(ConfVars.HIVEMERGEORCFILESTRIPELEVEL) &&
fsInputDesc.getTableInfo().getInputFileFormatClass().equals(OrcInputFormat.class))) {
- // Check if InputFormatClass is valid
- final String inputFormatClass;
- if (fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
- inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+ cplan = GenMapRedUtils.createMergeTask(fsInputDesc, finalName,
+ dpCtx != null && dpCtx.getNumDPCols() > 0);
+ if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ work = new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
+ cplan.setName("Tez Merge File Work");
+ ((TezWork) work).add(cplan);
} else {
- inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATSTRIPELEVEL);
- }
- try {
- Class c = Class.forName(inputFormatClass);
-
- if(fsInputDesc.getTableInfo().getInputFileFormatClass().equals(OrcInputFormat.class)) {
- LOG.info("OrcFile format - Using stripe level merge");
- } else {
- LOG.info("RCFile format- Using block level merge");
- }
- cplan = GenMapRedUtils.createMergeTask(fsInputDesc, finalName,
- dpCtx != null && dpCtx.getNumDPCols() > 0);
work = cplan;
- } catch (ClassNotFoundException e) {
- String msg = "Illegal input format class: " + inputFormatClass;
- throw new SemanticException(msg);
}
} else {
cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
work = new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
- cplan.setName("Merge");
+ cplan.setName("Tez Merge File Work");
((TezWork)work).add(cplan);
} else {
work = new MapredWork();
@@ -1489,6 +1485,7 @@ public final class GenMapRedUtils {
*
* @param fsInputDesc
* @param finalName
+ * @param inputFormatClass
* @return MergeWork if table is stored as RCFile or ORCFile,
* null otherwise
*/
@@ -1498,38 +1495,62 @@ public final class GenMapRedUtils {
Path inputDir = fsInputDesc.getFinalDirName();
TableDesc tblDesc = fsInputDesc.getTableInfo();
- if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class) ||
- tblDesc.getInputFileFormatClass().equals(OrcInputFormat.class)) {
- ArrayList<Path> inputDirs = new ArrayList<Path>(1);
- ArrayList<String> inputDirstr = new ArrayList<String>(1);
- if (!hasDynamicPartitions
- && !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
- inputDirs.add(inputDir);
- inputDirstr.add(inputDir.toString());
- }
-
- MergeWork work = new MergeWork(inputDirs, finalName,
- hasDynamicPartitions, fsInputDesc.getDynPartCtx(),
- tblDesc.getInputFileFormatClass());
- LinkedHashMap<String, ArrayList<String>> pathToAliases =
- new LinkedHashMap<String, ArrayList<String>>();
- pathToAliases.put(inputDir.toString(), (ArrayList<String>) inputDirstr.clone());
- work.setMapperCannotSpanPartns(true);
- work.setPathToAliases(pathToAliases);
- work.setAliasToWork(
- new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
- if (hasDynamicPartitions
- || GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
- work.getPathToPartitionInfo().put(inputDir.toString(),
- new PartitionDesc(tblDesc, null));
- }
- work.setListBucketingCtx(fsInputDesc.getLbCtx());
+ List<Path> inputDirs = new ArrayList<Path>(1);
+ ArrayList<String> inputDirstr = new ArrayList<String>(1);
+ // this will be populated by MergeFileWork.resolveDynamicPartitionStoredAsSubDirsMerge
+ // in case of dynamic partitioning and list bucketing
+ if (!hasDynamicPartitions &&
+ !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
+ inputDirs.add(inputDir);
+ }
+ inputDirstr.add(inputDir.toString());
+
+ // internal input format class for CombineHiveInputFormat
+ final Class<? extends InputFormat> internalIFClass;
+ if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+ internalIFClass = RCFileBlockMergeInputFormat.class;
+ } else if (tblDesc.getInputFileFormatClass().equals(OrcInputFormat.class)) {
+ internalIFClass = OrcFileStripeMergeInputFormat.class;
+ } else {
+ throw new SemanticException("createMergeTask called on a table with file"
+ + " format other than RCFile or ORCFile");
+ }
- return work;
+ // create the merge file work
+ MergeFileWork work = new MergeFileWork(inputDirs, finalName,
+ hasDynamicPartitions, tblDesc.getInputFileFormatClass().getName());
+ LinkedHashMap<String, ArrayList<String>> pathToAliases =
+ new LinkedHashMap<String, ArrayList<String>>();
+ pathToAliases.put(inputDir.toString(), inputDirstr);
+ work.setMapperCannotSpanPartns(true);
+ work.setPathToAliases(pathToAliases);
+ PartitionDesc pDesc = new PartitionDesc(tblDesc, null);
+ pDesc.setInputFileFormatClass(internalIFClass);
+ work.getPathToPartitionInfo().put(inputDir.toString(), pDesc);
+ work.setListBucketingCtx(fsInputDesc.getLbCtx());
+
+ // create alias to work which contains the merge operator
+ LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork =
+ new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
+ Operator<? extends OperatorDesc> mergeOp = null;
+ final FileMergeDesc fmd;
+ if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+ fmd = new RCFileMergeDesc();
+ } else {
+ fmd = new OrcFileMergeDesc();
}
+ fmd.setDpCtx(fsInputDesc.getDynPartCtx());
+ fmd.setOutputPath(finalName);
+ fmd.setHasDynamicPartitions(work.hasDynamicPartitions());
+ fmd.setListBucketingAlterTableConcatenate(work.isListBucketingAlterTableConcatenate());
+ int lbLevel = work.getListBucketingCtx() == null ? 0 :
+ work.getListBucketingCtx().calculateListBucketingLevel();
+ fmd.setListBucketingDepth(lbLevel);
+ mergeOp = OperatorFactory.get(fmd);
+ aliasToWork.put(inputDir.toString(), mergeOp);
+ work.setAliasToWork(aliasToWork);
- throw new SemanticException("createMergeTask called on a table with file"
- + " format other than RCFile or ORCFile");
+ return work;
}
/**
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Sat Sep 13 00:39:26 2014
@@ -404,6 +404,9 @@ public class TezCompiler extends TaskCom
}
private void setInputFormat(MapWork work, Operator<? extends OperatorDesc> op) {
+ if (op == null) {
+ return;
+ }
if (op.isUseBucketizedHiveInputFormat()) {
work.setUseBucketizedHiveInputFormat(true);
return;
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ *
+ */
+public class FileMergeDesc extends AbstractOperatorDesc {
+ private DynamicPartitionCtx dpCtx;
+ private Path outputPath;
+ private int listBucketingDepth;
+ private boolean hasDynamicPartitions;
+ private boolean isListBucketingAlterTableConcatenate;
+
+ public FileMergeDesc(DynamicPartitionCtx dynPartCtx, Path outputDir) {
+ this.dpCtx = dynPartCtx;
+ this.outputPath = outputDir;
+ }
+
+ public DynamicPartitionCtx getDpCtx() {
+ return dpCtx;
+ }
+
+ public void setDpCtx(DynamicPartitionCtx dpCtx) {
+ this.dpCtx = dpCtx;
+ }
+
+ public Path getOutputPath() {
+ return outputPath;
+ }
+
+ public void setOutputPath(Path outputPath) {
+ this.outputPath = outputPath;
+ }
+
+ public int getListBucketingDepth() {
+ return listBucketingDepth;
+ }
+
+ public void setListBucketingDepth(int listBucketingDepth) {
+ this.listBucketingDepth = listBucketingDepth;
+ }
+
+ public boolean hasDynamicPartitions() {
+ return hasDynamicPartitions;
+ }
+
+ public void setHasDynamicPartitions(boolean hasDynamicPartitions) {
+ this.hasDynamicPartitions = hasDynamicPartitions;
+ }
+
+ public boolean isListBucketingAlterTableConcatenate() {
+ return isListBucketingAlterTableConcatenate;
+ }
+
+ public void setListBucketingAlterTableConcatenate(boolean isListBucketingAlterTableConcatenate) {
+ this.isListBucketingAlterTableConcatenate = isListBucketingAlterTableConcatenate;
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OrcFileMergeDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OrcFileMergeDesc.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OrcFileMergeDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/OrcFileMergeDesc.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * ORC fast file merge operator descriptor.
+ */
+@Explain(displayName = "ORC File Merge Operator")
+public class OrcFileMergeDesc extends FileMergeDesc {
+
+ public OrcFileMergeDesc() {
+ this(null, null);
+ }
+
+ public OrcFileMergeDesc(DynamicPartitionCtx dpCtx, Path outPath) {
+ super(dpCtx, outPath);
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/RCFileMergeDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/RCFileMergeDesc.java?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/RCFileMergeDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/RCFileMergeDesc.java Sat Sep 13 00:39:26 2014
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Descriptor for Fast file merge RC file operator.
+ */
+@Explain(displayName = "RCFile Merge Operator")
+public class RCFileMergeDesc extends FileMergeDesc {
+
+ public RCFileMergeDesc() {
+ this(null, null);
+ }
+
+ public RCFileMergeDesc(DynamicPartitionCtx dpCtx, Path outPath) {
+ super(dpCtx, outPath);
+ }
+
+}
Modified: hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_8.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_8.q?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_8.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_8.q Sat Sep 13 00:39:26 2014
@@ -69,7 +69,6 @@ show partitions list_bucketing_dynamic_p
desc formatted list_bucketing_dynamic_part partition (ds='2008-04-08', hr='a1');
desc formatted list_bucketing_dynamic_part partition (ds='2008-04-08', hr='b1');
-set hive.merge.current.job.concatenate.list.bucketing=true;
-- concatenate the partition and it will merge files
alter table list_bucketing_dynamic_part partition (ds='2008-04-08', hr='b1') concatenate;
Modified: hive/trunk/ql/src/test/queries/clientpositive/orc_merge1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/orc_merge1.q?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/orc_merge1.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/orc_merge1.q Sat Sep 13 00:39:26 2014
@@ -1,51 +1,87 @@
set hive.merge.orcfile.stripe.level=false;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.optimize.sort.dynamic.partition=false;
+set mapred.min.split.size=1000;
+set mapred.max.split.size=2000;
+set tez.grouping.min-size=1000;
+set tez.grouping.max-size=2000;
+set hive.merge.tezfiles=false;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
DROP TABLE orcfile_merge1;
DROP TABLE orcfile_merge1b;
+DROP TABLE orcfile_merge1c;
CREATE TABLE orcfile_merge1 (key INT, value STRING)
PARTITIONED BY (ds STRING, part STRING) STORED AS ORC;
CREATE TABLE orcfile_merge1b (key INT, value STRING)
PARTITIONED BY (ds STRING, part STRING) STORED AS ORC;
+CREATE TABLE orcfile_merge1c (key INT, value STRING)
+ PARTITIONED BY (ds STRING, part STRING) STORED AS ORC;
--- Use non stipe-level merge
+-- merge disabled
EXPLAIN
INSERT OVERWRITE TABLE orcfile_merge1 PARTITION (ds='1', part)
- SELECT key, value, PMOD(HASH(key), 100) as part
+ SELECT key, value, PMOD(HASH(key), 2) as part
FROM src;
INSERT OVERWRITE TABLE orcfile_merge1 PARTITION (ds='1', part)
- SELECT key, value, PMOD(HASH(key), 100) as part
+ SELECT key, value, PMOD(HASH(key), 2) as part
FROM src;
-DESC FORMATTED orcfile_merge1 partition (ds='1', part='50');
+DESC FORMATTED orcfile_merge1 partition (ds='1', part='0');
-set hive.merge.orcfile.stripe.level=true;
+set hive.merge.tezfiles=true;
+set hive.merge.mapfiles=true;
+set hive.merge.mapredfiles=true;
+-- auto-merge slow way
EXPLAIN
INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part)
- SELECT key, value, PMOD(HASH(key), 100) as part
+ SELECT key, value, PMOD(HASH(key), 2) as part
FROM src;
INSERT OVERWRITE TABLE orcfile_merge1b PARTITION (ds='1', part)
- SELECT key, value, PMOD(HASH(key), 100) as part
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src;
+
+DESC FORMATTED orcfile_merge1b partition (ds='1', part='0');
+
+set hive.merge.orcfile.stripe.level=true;
+-- auto-merge fast way
+EXPLAIN
+ INSERT OVERWRITE TABLE orcfile_merge1c PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
+ FROM src;
+
+INSERT OVERWRITE TABLE orcfile_merge1c PARTITION (ds='1', part)
+ SELECT key, value, PMOD(HASH(key), 2) as part
FROM src;
-DESC FORMATTED orcfile_merge1 partition (ds='1', part='50');
+DESC FORMATTED orcfile_merge1c partition (ds='1', part='0');
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
-- Verify
SELECT SUM(HASH(c)) FROM (
SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
FROM orcfile_merge1 WHERE ds='1'
) t;
-set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
-
SELECT SUM(HASH(c)) FROM (
SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
FROM orcfile_merge1b WHERE ds='1'
) t;
+SELECT SUM(HASH(c)) FROM (
+ SELECT TRANSFORM(*) USING 'tr \t _' AS (c)
+ FROM orcfile_merge1c WHERE ds='1'
+) t;
+
+select count(*) from orcfile_merge1;
+select count(*) from orcfile_merge1b;
+select count(*) from orcfile_merge1c;
+
DROP TABLE orcfile_merge1;
DROP TABLE orcfile_merge1b;
+DROP TABLE orcfile_merge1c;
Added: hive/trunk/ql/src/test/queries/clientpositive/orc_merge5.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/orc_merge5.q?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/orc_merge5.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/orc_merge5.q Sat Sep 13 00:39:26 2014
@@ -0,0 +1,61 @@
+-- SORT_QUERY_RESULTS
+
+create table orc_merge5 (userid bigint, string1 string, subtype double, decimal1 decimal, ts timestamp) stored as orc;
+create table orc_merge5b (userid bigint, string1 string, subtype double, decimal1 decimal, ts timestamp) stored as orc;
+
+load data local inpath '../../data/files/orc_split_elim.orc' into table orc_merge5;
+
+SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+SET mapred.min.split.size=1000;
+SET mapred.max.split.size=50000;
+SET hive.optimize.index.filter=true;
+set hive.merge.orcfile.stripe.level=false;
+set hive.merge.tezfiles=false;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+set hive.compute.splits.in.am=true;
+set tez.grouping.min-size=1000;
+set tez.grouping.max-size=50000;
+
+-- 3 mappers
+explain insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
+insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
+
+-- 3 files total
+analyze table orc_merge5b compute statistics noscan;
+desc formatted orc_merge5b;
+select * from orc_merge5b;
+
+set hive.merge.orcfile.stripe.level=true;
+set hive.merge.tezfiles=true;
+set hive.merge.mapfiles=true;
+set hive.merge.mapredfiles=true;
+
+-- 3 mappers
+explain insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
+insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
+
+-- 1 file after merging
+analyze table orc_merge5b compute statistics noscan;
+desc formatted orc_merge5b;
+select * from orc_merge5b;
+
+set hive.merge.orcfile.stripe.level=false;
+set hive.merge.tezfiles=false;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+
+insert overwrite table orc_merge5b select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
+analyze table orc_merge5b compute statistics noscan;
+desc formatted orc_merge5b;
+select * from orc_merge5b;
+
+set hive.merge.orcfile.stripe.level=true;
+explain alter table orc_merge5b concatenate;
+alter table orc_merge5b concatenate;
+
+-- 1 file after merging
+analyze table orc_merge5b compute statistics noscan;
+desc formatted orc_merge5b;
+select * from orc_merge5b;
+
Added: hive/trunk/ql/src/test/queries/clientpositive/orc_merge6.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/orc_merge6.q?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/orc_merge6.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/orc_merge6.q Sat Sep 13 00:39:26 2014
@@ -0,0 +1,78 @@
+-- SORT_QUERY_RESULTS
+
+-- orc file merge tests for static partitions
+create table orc_merge5 (userid bigint, string1 string, subtype double, decimal1 decimal, ts timestamp) stored as orc;
+create table orc_merge5a (userid bigint, string1 string, subtype double, decimal1 decimal, ts timestamp) partitioned by (year string, hour int) stored as orc;
+
+load data local inpath '../../data/files/orc_split_elim.orc' into table orc_merge5;
+
+SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+SET mapred.min.split.size=1000;
+SET mapred.max.split.size=50000;
+SET hive.optimize.index.filter=true;
+set hive.merge.orcfile.stripe.level=false;
+set hive.merge.tezfiles=false;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+set hive.compute.splits.in.am=true;
+set tez.grouping.min-size=1000;
+set tez.grouping.max-size=50000;
+
+-- 3 mappers
+explain insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
+insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
+insert overwrite table orc_merge5a partition (year="2001",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
+
+-- 3 files total
+analyze table orc_merge5a partition(year="2000",hour=24) compute statistics noscan;
+analyze table orc_merge5a partition(year="2001",hour=24) compute statistics noscan;
+desc formatted orc_merge5a partition(year="2000",hour=24);
+desc formatted orc_merge5a partition(year="2001",hour=24);
+show partitions orc_merge5a;
+select * from orc_merge5a;
+
+set hive.merge.orcfile.stripe.level=true;
+set hive.merge.tezfiles=true;
+set hive.merge.mapfiles=true;
+set hive.merge.mapredfiles=true;
+
+-- 3 mappers
+explain insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
+insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
+insert overwrite table orc_merge5a partition (year="2001",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
+
+-- 1 file after merging
+analyze table orc_merge5a partition(year="2000",hour=24) compute statistics noscan;
+analyze table orc_merge5a partition(year="2001",hour=24) compute statistics noscan;
+desc formatted orc_merge5a partition(year="2000",hour=24);
+desc formatted orc_merge5a partition(year="2001",hour=24);
+show partitions orc_merge5a;
+select * from orc_merge5a;
+
+set hive.merge.orcfile.stripe.level=false;
+set hive.merge.tezfiles=false;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+
+insert overwrite table orc_merge5a partition (year="2000",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
+insert overwrite table orc_merge5a partition (year="2001",hour=24) select userid,string1,subtype,decimal1,ts from orc_merge5 where userid<=13;
+analyze table orc_merge5a partition(year="2000",hour=24) compute statistics noscan;
+analyze table orc_merge5a partition(year="2001",hour=24) compute statistics noscan;
+desc formatted orc_merge5a partition(year="2000",hour=24);
+desc formatted orc_merge5a partition(year="2001",hour=24);
+show partitions orc_merge5a;
+select * from orc_merge5a;
+
+set hive.merge.orcfile.stripe.level=true;
+explain alter table orc_merge5a partition(year="2000",hour=24) concatenate;
+alter table orc_merge5a partition(year="2000",hour=24) concatenate;
+alter table orc_merge5a partition(year="2001",hour=24) concatenate;
+
+-- 1 file after merging
+analyze table orc_merge5a partition(year="2000",hour=24) compute statistics noscan;
+analyze table orc_merge5a partition(year="2001",hour=24) compute statistics noscan;
+desc formatted orc_merge5a partition(year="2000",hour=24);
+desc formatted orc_merge5a partition(year="2001",hour=24);
+show partitions orc_merge5a;
+select * from orc_merge5a;
+
Added: hive/trunk/ql/src/test/queries/clientpositive/orc_merge7.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/orc_merge7.q?rev=1624688&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/orc_merge7.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/orc_merge7.q Sat Sep 13 00:39:26 2014
@@ -0,0 +1,82 @@
+-- SORT_QUERY_RESULTS
+
+-- orc merge file tests for dynamic partition case
+
+create table orc_merge5 (userid bigint, string1 string, subtype double, decimal1 decimal, ts timestamp) stored as orc;
+create table orc_merge5a (userid bigint, string1 string, subtype double, decimal1 decimal, ts timestamp) partitioned by (st double) stored as orc;
+
+load data local inpath '../../data/files/orc_split_elim.orc' into table orc_merge5;
+
+SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+SET mapred.min.split.size=1000;
+SET mapred.max.split.size=50000;
+SET hive.optimize.index.filter=true;
+set hive.merge.orcfile.stripe.level=false;
+set hive.merge.tezfiles=false;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+set hive.compute.splits.in.am=true;
+set tez.grouping.min-size=1000;
+set tez.grouping.max-size=50000;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.optimize.sort.dynamic.partition=false;
+
+-- 3 mappers
+explain insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
+insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
+insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
+
+-- 3 files total
+analyze table orc_merge5a partition(st=80.0) compute statistics noscan;
+analyze table orc_merge5a partition(st=0.8) compute statistics noscan;
+desc formatted orc_merge5a partition(st=80.0);
+desc formatted orc_merge5a partition(st=0.8);
+show partitions orc_merge5a;
+select * from orc_merge5a where userid<=13;
+
+set hive.merge.orcfile.stripe.level=true;
+set hive.merge.tezfiles=true;
+set hive.merge.mapfiles=true;
+set hive.merge.mapredfiles=true;
+
+-- 3 mappers
+explain insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
+insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
+insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
+
+-- 1 file after merging
+analyze table orc_merge5a partition(st=80.0) compute statistics noscan;
+analyze table orc_merge5a partition(st=0.8) compute statistics noscan;
+desc formatted orc_merge5a partition(st=80.0);
+desc formatted orc_merge5a partition(st=0.8);
+show partitions orc_merge5a;
+select * from orc_merge5a where userid<=13;
+
+set hive.merge.orcfile.stripe.level=false;
+set hive.merge.tezfiles=false;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+
+insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
+insert overwrite table orc_merge5a partition (st) select userid,string1,subtype,decimal1,ts,subtype from orc_merge5;
+analyze table orc_merge5a partition(st=80.0) compute statistics noscan;
+analyze table orc_merge5a partition(st=0.8) compute statistics noscan;
+desc formatted orc_merge5a partition(st=80.0);
+desc formatted orc_merge5a partition(st=0.8);
+show partitions orc_merge5a;
+select * from orc_merge5a where userid<=13;
+
+set hive.merge.orcfile.stripe.level=true;
+explain alter table orc_merge5a partition(st=80.0) concatenate;
+alter table orc_merge5a partition(st=80.0) concatenate;
+alter table orc_merge5a partition(st=0.8) concatenate;
+
+-- 1 file after merging
+analyze table orc_merge5a partition(st=80.0) compute statistics noscan;
+analyze table orc_merge5a partition(st=0.8) compute statistics noscan;
+desc formatted orc_merge5a partition(st=80.0);
+desc formatted orc_merge5a partition(st=0.8);
+show partitions orc_merge5a;
+select * from orc_merge5a where userid<=13;
+
Modified: hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out Sat Sep 13 00:39:26 2014
@@ -566,12 +566,16 @@ STAGE PLANS:
Stats-Aggr Operator
Stage: Stage-4
- Merge Work
+ Merge File Operator
+ Map Operator Tree:
+ RCFile Merge Operator
merge level: block
input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
Stage: Stage-6
- Merge Work
+ Merge File Operator
+ Map Operator Tree:
+ RCFile Merge Operator
merge level: block
input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
Modified: hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
Files hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_10.q.out Sat Sep 13 00:39:26 2014 differ
Modified: hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
Files hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out Sat Sep 13 00:39:26 2014 differ
Modified: hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
Files hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out Sat Sep 13 00:39:26 2014 differ
Modified: hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
Files hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out Sat Sep 13 00:39:26 2014 differ
Modified: hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
Files hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out Sat Sep 13 00:39:26 2014 differ
Modified: hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition4.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition4.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition4.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition4.q.out Sat Sep 13 00:39:26 2014
@@ -202,12 +202,16 @@ STAGE PLANS:
Stats-Aggr Operator
Stage: Stage-3
- Merge Work
+ Merge File Operator
+ Map Operator Tree:
+ RCFile Merge Operator
merge level: block
input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
Stage: Stage-5
- Merge Work
+ Merge File Operator
+ Map Operator Tree:
+ RCFile Merge Operator
merge level: block
input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
Modified: hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition5.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition5.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition5.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition5.q.out Sat Sep 13 00:39:26 2014
@@ -176,12 +176,16 @@ STAGE PLANS:
Stats-Aggr Operator
Stage: Stage-3
- Merge Work
+ Merge File Operator
+ Map Operator Tree:
+ RCFile Merge Operator
merge level: block
input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
Stage: Stage-5
- Merge Work
+ Merge File Operator
+ Map Operator Tree:
+ RCFile Merge Operator
merge level: block
input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
Modified: hive/trunk/ql/src/test/results/clientpositive/orc_createas1.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/orc_createas1.q.out?rev=1624688&r1=1624687&r2=1624688&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/orc_createas1.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/orc_createas1.q.out Sat Sep 13 00:39:26 2014
@@ -111,12 +111,16 @@ STAGE PLANS:
Stats-Aggr Operator
Stage: Stage-3
- Merge Work
+ Merge File Operator
+ Map Operator Tree:
+ ORC File Merge Operator
merge level: stripe
input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
Stage: Stage-5
- Merge Work
+ Merge File Operator
+ Map Operator Tree:
+ ORC File Merge Operator
merge level: stripe
input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
@@ -270,12 +274,16 @@ STAGE PLANS:
Stats-Aggr Operator
Stage: Stage-3
- Merge Work
+ Merge File Operator
+ Map Operator Tree:
+ ORC File Merge Operator
merge level: stripe
input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
Stage: Stage-5
- Merge Work
+ Merge File Operator
+ Map Operator Tree:
+ ORC File Merge Operator
merge level: stripe
input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat