You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/09/19 20:38:51 UTC
svn commit: r1626292 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/io/
java/org/apache/hadoop/hive/ql/io/orc/ test/org/apache/hadoop/hive/ql/exec/
test/org/apache/hadoop/hive/ql/io/orc/
Author: omalley
Date: Fri Sep 19 18:38:51 2014
New Revision: 1626292
URL: http://svn.apache.org/r1626292
Log:
HIVE-7812. Disable CombineHiveInputFormat for ACID tables. (omalley reviewed
by Ashutosh Chauhan)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1626292&r1=1626291&r2=1626292&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Fri Sep 19 18:38:51 2014
@@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -34,10 +35,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
-import org.apache.hadoop.hive.ql.exec.FooterBuffer;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -49,7 +48,6 @@ import org.apache.hadoop.hive.ql.plan.Fe
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -61,11 +59,8 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
@@ -83,6 +78,9 @@ public class FetchOperator implements Se
static Log LOG = LogFactory.getLog(FetchOperator.class.getName());
static LogHelper console = new LogHelper(LOG);
+ public static final String FETCH_OPERATOR_DIRECTORY_LIST =
+ "hive.complete.dir.list";
+
private boolean isNativeTable;
private FetchWork work;
protected Operator<?> operator; // operator tree for processing row further (option)
@@ -353,6 +351,7 @@ public class FetchOperator implements Se
}
return;
} else {
+ setFetchOperatorContext(job, work.getPartDir());
iterPath = work.getPartDir().iterator();
iterPartDesc = work.getPartDesc().iterator();
}
@@ -381,6 +380,30 @@ public class FetchOperator implements Se
}
/**
+ * Set context for this fetch operator in to the jobconf.
+ * This helps InputFormats make decisions based on the scope of the complete
+ * operation.
+ * @param conf the configuration to modify
+ * @param partDirs the list of partition directories
+ */
+ static void setFetchOperatorContext(JobConf conf,
+ ArrayList<Path> partDirs) {
+ if (partDirs != null) {
+ StringBuilder buff = new StringBuilder();
+ boolean first = true;
+ for(Path p: partDirs) {
+ if (first) {
+ first = false;
+ } else {
+ buff.append('\t');
+ }
+ buff.append(StringEscapeUtils.escapeJava(p.toString()));
+ }
+ conf.set(FETCH_OPERATOR_DIRECTORY_LIST, buff.toString());
+ }
+ }
+
+ /**
* A cache of Object Inspector Settable Properties.
*/
private static Map<ObjectInspector, Boolean> oiSettableProperties = new HashMap<ObjectInspector, Boolean>();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1626292&r1=1626291&r2=1626292&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Fri Sep 19 18:38:51 2014
@@ -697,6 +697,7 @@ public class SMBMapJoinOperator extends
// But if hive supports assigning bucket number for each partition, this can be vary
public void setupContext(List<Path> paths) throws HiveException {
int segmentLen = paths.size();
+ FetchOperator.setFetchOperatorContext(jobConf, fetchWork.getPartDir());
FetchOperator[] segments = segmentsForSize(segmentLen);
for (int i = 0 ; i < segmentLen; i++) {
Path path = paths.get(i);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1626292&r1=1626291&r2=1626292&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Sep 19 18:38:51 2014
@@ -27,6 +27,7 @@ import org.antlr.runtime.CommonToken;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.WordUtils;
+import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -2275,13 +2276,13 @@ public final class Utilities {
* configuration which receives configured properties
*/
public static void copyTableJobPropertiesToConf(TableDesc tbl, JobConf job) {
- String bucketString = tbl.getProperties()
- .getProperty(hive_metastoreConstants.BUCKET_COUNT);
- // copy the bucket count
- if (bucketString != null) {
- job.set(hive_metastoreConstants.BUCKET_COUNT, bucketString);
+ Properties tblProperties = tbl.getProperties();
+ for(String name: tblProperties.stringPropertyNames()) {
+ if (job.get(name) == null) {
+ job.set(name,
+ StringEscapeUtils.escapeJava((String) tblProperties.get(name)));
+ }
}
-
Map<String, String> jobProperties = tbl.getJobProperties();
if (jobProperties == null) {
return;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1626292&r1=1626291&r2=1626292&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Fri Sep 19 18:38:51 2014
@@ -318,7 +318,7 @@ public class AcidUtils {
String filename = file.getPath().getName();
if (filename.startsWith(BASE_PREFIX) ||
filename.startsWith(DELTA_PREFIX)) {
- if (file.isDirectory()) {
+ if (file.isDir()) {
return true;
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java?rev=1626292&r1=1626291&r2=1626292&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java Fri Sep 19 18:38:51 2014
@@ -122,24 +122,7 @@ public class BucketizedHiveInputFormat<K
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
init(job);
- Path[] dirs = FileInputFormat.getInputPaths(job);
- if (dirs.length == 0) {
- // on tez we're avoiding to duplicate the file info in FileInputFormat.
- if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
- try {
- List<Path> paths = Utilities.getInputPathsTez(job, mrwork);
- dirs = paths.toArray(new Path[paths.size()]);
- if (dirs.length == 0) {
- // if we still don't have any files it's time to fail.
- throw new IOException("No input paths specified in job");
- }
- } catch (Exception e) {
- throw new IOException("Could not create input paths", e);
- }
- } else {
- throw new IOException("No input paths specified in job");
- }
- }
+ Path[] dirs = getInputPaths(job);
JobConf newjob = new JobConf(job);
ArrayList<InputSplit> result = new ArrayList<InputSplit>();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1626292&r1=1626291&r2=1626292&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Fri Sep 19 18:38:51 2014
@@ -33,6 +33,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -264,8 +265,8 @@ public class CombineHiveInputFormat<K ex
/**
* Create Hive splits based on CombineFileSplit.
*/
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ private InputSplit[] getCombineSplits(JobConf job,
+ int numSplits) throws IOException {
PerfLogger perfLogger = PerfLogger.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
init(job);
@@ -274,17 +275,6 @@ public class CombineHiveInputFormat<K ex
mrwork.getAliasToWork();
CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
.getCombineFileInputFormat();
-
- // on tez we're avoiding duplicating path info since the info will go over
- // rpc
- if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
- try {
- List<Path> dirs = Utilities.getInputPathsTez(job, mrwork);
- Utilities.setInputPaths(job, dirs);
- } catch (Exception e) {
- throw new IOException("Could not create input paths", e);
- }
- }
InputSplit[] splits = null;
if (combine == null) {
@@ -327,13 +317,6 @@ public class CombineHiveInputFormat<K ex
// ignore
}
FileSystem inpFs = path.getFileSystem(job);
- if (inputFormatClass.isAssignableFrom(OrcInputFormat.class)) {
- if (inpFs.exists(new Path(path, OrcRecordUpdater.ACID_FORMAT))) {
- throw new IOException("CombineHiveInputFormat is incompatible " +
- " with ACID tables. Please set hive.input.format=" +
- "org.apache.hadoop.hive.ql.io.HiveInputFormat");
- }
- }
// Since there is no easy way of knowing whether MAPREDUCE-1597 is present in the tree or not,
// we use a configuration variable for the same
@@ -461,6 +444,84 @@ public class CombineHiveInputFormat<K ex
return result.toArray(new CombineHiveInputSplit[result.size()]);
}
+ /**
+ * Create Hive splits based on CombineFileSplit.
+ */
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ init(job);
+ Map<String, ArrayList<String>> pathToAliases = mrwork.getPathToAliases();
+ Map<String, Operator<? extends OperatorDesc>> aliasToWork =
+ mrwork.getAliasToWork();
+
+ ArrayList<InputSplit> result = new ArrayList<InputSplit>();
+
+ Path[] paths = getInputPaths(job);
+
+ List<Path> nonCombinablePaths = new ArrayList<Path>(paths.length / 2);
+ List<Path> combinablePaths = new ArrayList<Path>(paths.length / 2);
+
+ for (Path path : paths) {
+
+ PartitionDesc part =
+ HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+ pathToPartitionInfo, path,
+ IOPrepareCache.get().allocatePartitionDescMap());
+
+ // Use HiveInputFormat if any of the paths is not splittable
+ Class inputFormatClass = part.getInputFileFormatClass();
+ String inputFormatClassName = inputFormatClass.getName();
+ InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+ if (inputFormat instanceof AvoidSplitCombination &&
+ ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, job)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The split [" + path +
+ "] is being parked for HiveInputFormat.getSplits");
+ }
+ nonCombinablePaths.add(path);
+ } else {
+ combinablePaths.add(path);
+ }
+ }
+
+ // Store the previous value for the path specification
+ String oldPaths = job.get(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The received input paths are: [" + oldPaths +
+ "] against the property "
+ + HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname);
+ }
+
+ // Process the normal splits
+ if (nonCombinablePaths.size() > 0) {
+ FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray
+ (new Path[nonCombinablePaths.size()]));
+ InputSplit[] splits = super.getSplits(job, numSplits);
+ for (InputSplit split : splits) {
+ result.add(split);
+ }
+ }
+
+ // Process the combine splits
+ if (combinablePaths.size() > 0) {
+ FileInputFormat.setInputPaths(job, combinablePaths.toArray
+ (new Path[combinablePaths.size()]));
+ InputSplit[] splits = getCombineSplits(job, numSplits);
+ for (InputSplit split : splits) {
+ result.add(split);
+ }
+ }
+
+ // Restore the old path information back
+ // This is just to prevent incompatibilities with previous versions Hive
+ // if some application depends on the original value being set.
+ if (oldPaths != null) {
+ job.set(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname, oldPaths);
+ }
+ LOG.info("Number of all splits " + result.size());
+ return result.toArray(new InputSplit[result.size()]);
+ }
+
private void processPaths(JobConf job, CombineFileInputFormatShim combine,
List<InputSplitShim> iss, Path... path) throws IOException {
JobConf currJob = new JobConf(job);
@@ -635,4 +696,12 @@ public class CombineHiveInputFormat<K ex
return s.toString();
}
}
+
+ /**
+ * This is a marker interface that is used to identify the formats where
+ * combine split generation is not applicable
+ */
+ public interface AvoidSplitCombination {
+ boolean shouldSkipCombine(Path path, Configuration conf) throws IOException;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1626292&r1=1626291&r2=1626292&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Fri Sep 19 18:38:51 2014
@@ -295,11 +295,7 @@ public class HiveInputFormat<K extends W
}
}
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- PerfLogger perfLogger = PerfLogger.getPerfLogger();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
- init(job);
-
+ Path[] getInputPaths(JobConf job) throws IOException {
Path[] dirs = FileInputFormat.getInputPaths(job);
if (dirs.length == 0) {
// on tez we're avoiding to duplicate the file info in FileInputFormat.
@@ -314,6 +310,14 @@ public class HiveInputFormat<K extends W
throw new IOException("No input paths specified in job");
}
}
+ return dirs;
+ }
+
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
+ init(job);
+ Path[] dirs = getInputPaths(job);
JobConf newjob = new JobConf(job);
List<InputSplit> result = new ArrayList<InputSplit>();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1626292&r1=1626291&r2=1626292&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Fri Sep 19 18:38:51 2014
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
@@ -101,7 +102,8 @@ import com.google.common.util.concurrent
*/
public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
InputFormatChecker, VectorizedInputFormatInterface,
- AcidInputFormat<NullWritable, OrcStruct> {
+ AcidInputFormat<NullWritable, OrcStruct>,
+ CombineHiveInputFormat.AvoidSplitCombination {
private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
@@ -127,6 +129,12 @@ public class OrcInputFormat implements
*/
private static final double MIN_INCLUDED_LOCATION = 0.80;
+ @Override
+ public boolean shouldSkipCombine(Path path,
+ Configuration conf) throws IOException {
+ return AcidUtils.isAcid(path, conf);
+ }
+
private static class OrcRecordReader
implements org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>,
StatsProvidingRecordReader {
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java?rev=1626292&r1=1626291&r2=1626292&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java Fri Sep 19 18:38:51 2014
@@ -18,17 +18,24 @@
package org.apache.hadoop.hive.ql.exec;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
@@ -42,6 +49,10 @@ import org.apache.hadoop.hive.ql.plan.Pl
import org.apache.hadoop.hive.ql.plan.ScriptDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.processors.CommandProcessor;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -49,8 +60,14 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.junit.Test;
/**
* TestOperators.
@@ -274,7 +291,7 @@ public class TestOperators extends TestC
cd, sop);
op.initialize(new JobConf(TestOperators.class),
- new ObjectInspector[] {r[0].oi});
+ new ObjectInspector[]{r[0].oi});
// evaluate on row
for (int i = 0; i < 5; i++) {
@@ -379,4 +396,82 @@ public class TestOperators extends TestC
throw (e);
}
}
+
+ @Test
+ public void testFetchOperatorContextQuoting() throws Exception {
+ JobConf conf = new JobConf();
+ ArrayList<Path> list = new ArrayList<Path>();
+ list.add(new Path("hdfs://nn.example.com/fi\tl\\e\t1"));
+ list.add(new Path("hdfs://nn.example.com/file\t2"));
+ list.add(new Path("file:/file3"));
+ FetchOperator.setFetchOperatorContext(conf, list);
+ String[] parts =
+ conf.get(FetchOperator.FETCH_OPERATOR_DIRECTORY_LIST).split("\t");
+ assertEquals(3, parts.length);
+ assertEquals("hdfs://nn.example.com/fi\\tl\\\\e\\t1", parts[0]);
+ assertEquals("hdfs://nn.example.com/file\\t2", parts[1]);
+ assertEquals("file:/file3", parts[2]);
+ }
+
+ /**
+ * A custom input format that checks to make sure that the fetch operator
+ * sets the required attributes.
+ */
+ public static class CustomInFmt extends TextInputFormat {
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int splits) throws IOException {
+
+ // ensure that the table properties were copied
+ assertEquals("val1", job.get("myprop1"));
+ assertEquals("val2", job.get("myprop2"));
+
+ // ensure that both of the partitions are in the complete list.
+ String[] dirs = job.get("hive.complete.dir.list").split("\t");
+ assertEquals(2, dirs.length);
+ assertEquals(true, dirs[0].endsWith("/state=CA"));
+ assertEquals(true, dirs[1].endsWith("/state=OR"));
+ return super.getSplits(job, splits);
+ }
+ }
+
+ @Test
+ public void testFetchOperatorContext() throws Exception {
+ HiveConf conf = new HiveConf();
+ conf.set("hive.support.concurrency", "false");
+ SessionState.start(conf);
+ String cmd = "create table fetchOp (id int, name string) " +
+ "partitioned by (state string) " +
+ "row format delimited fields terminated by '|' " +
+ "stored as " +
+ "inputformat 'org.apache.hadoop.hive.ql.exec.TestOperators$CustomInFmt' " +
+ "outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' " +
+ "tblproperties ('myprop1'='val1', 'myprop2' = 'val2')";
+ Driver driver = new Driver();
+ driver.init();
+ CommandProcessorResponse response = driver.run(cmd);
+ assertEquals(0, response.getResponseCode());
+ List<Object> result = new ArrayList<Object>();
+
+ cmd = "load data local inpath '../data/files/employee.dat' " +
+ "overwrite into table fetchOp partition (state='CA')";
+ driver.init();
+ response = driver.run(cmd);
+ assertEquals(0, response.getResponseCode());
+
+ cmd = "load data local inpath '../data/files/employee2.dat' " +
+ "overwrite into table fetchOp partition (state='OR')";
+ driver.init();
+ response = driver.run(cmd);
+ assertEquals(0, response.getResponseCode());
+
+ cmd = "select * from fetchOp";
+ driver.init();
+ driver.setMaxRows(500);
+ response = driver.run(cmd);
+ assertEquals(0, response.getResponseCode());
+ driver.getResults(result);
+ assertEquals(20, result.size());
+ driver.close();
+ }
}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1626292&r1=1626291&r2=1626292&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java Fri Sep 19 18:38:51 2014
@@ -118,7 +118,6 @@ public class TestInputOutputFormat {
TimeZone gmt = TimeZone.getTimeZone("GMT+0");
DATE_FORMAT.setTimeZone(gmt);
TIME_FORMAT.setTimeZone(gmt);
- TimeZone local = TimeZone.getDefault();
}
public static class BigRow implements Writable {
@@ -560,6 +559,12 @@ public class TestInputOutputFormat {
this.file = file;
}
+ /**
+ * Set the blocks and their location for the file.
+ * Must be called after the stream is closed or the block length will be
+ * wrong.
+ * @param blocks the list of blocks
+ */
public void setBlocks(MockBlock... blocks) {
file.blocks = blocks;
int offset = 0;
@@ -580,12 +585,18 @@ public class TestInputOutputFormat {
file.content = new byte[file.length];
System.arraycopy(buf.getData(), 0, file.content, 0, file.length);
}
+
+ @Override
+ public String toString() {
+ return "Out stream to " + file.toString();
+ }
}
public static class MockFileSystem extends FileSystem {
final List<MockFile> files = new ArrayList<MockFile>();
Path workingDir = new Path("/");
+ @SuppressWarnings("unused")
public MockFileSystem() {
// empty
}
@@ -620,7 +631,7 @@ public class TestInputOutputFormat {
return new FSDataInputStream(new MockInputStream(file));
}
}
- return null;
+ throw new IOException("File not found: " + path);
}
@Override
@@ -743,8 +754,12 @@ public class TestInputOutputFormat {
for(MockBlock block: file.blocks) {
if (OrcInputFormat.SplitGenerator.getOverlap(block.offset,
block.length, start, len) > 0) {
+ String[] topology = new String[block.hosts.length];
+ for(int i=0; i < topology.length; ++i) {
+ topology[i] = "/rack/ " + block.hosts[i];
+ }
result.add(new BlockLocation(block.hosts, block.hosts,
- block.offset, block.length));
+ topology, block.offset, block.length));
}
}
return result.toArray(new BlockLocation[result.size()]);
@@ -1209,7 +1224,8 @@ public class TestInputOutputFormat {
Path warehouseDir,
String tableName,
ObjectInspector objectInspector,
- boolean isVectorized
+ boolean isVectorized,
+ int partitions
) throws IOException {
Utilities.clearWorkMap();
JobConf conf = new JobConf();
@@ -1218,9 +1234,20 @@ public class TestInputOutputFormat {
conf.set("hive.vectorized.execution.enabled", Boolean.toString(isVectorized));
conf.set("fs.mock.impl", MockFileSystem.class.getName());
conf.set("mapred.mapper.class", ExecMapper.class.getName());
- Path root = new Path(warehouseDir, tableName + "/p=0");
+ Path root = new Path(warehouseDir, tableName);
+ // clean out previous contents
((MockFileSystem) root.getFileSystem(conf)).clear();
- conf.set("mapred.input.dir", root.toString());
+ // build partition strings
+ String[] partPath = new String[partitions];
+ StringBuilder buffer = new StringBuilder();
+ for(int p=0; p < partitions; ++p) {
+ partPath[p] = new Path(root, "p=" + p).toString();
+ if (p != 0) {
+ buffer.append(',');
+ }
+ buffer.append(partPath[p]);
+ }
+ conf.set("mapred.input.dir", buffer.toString());
StringBuilder columnIds = new StringBuilder();
StringBuilder columnNames = new StringBuilder();
StringBuilder columnTypes = new StringBuilder();
@@ -1249,9 +1276,6 @@ public class TestInputOutputFormat {
tblProps.put("columns.types", columnTypes.toString());
TableDesc tbl = new TableDesc(OrcInputFormat.class, OrcOutputFormat.class,
tblProps);
- LinkedHashMap<String, String> partSpec =
- new LinkedHashMap<String, String>();
- PartitionDesc part = new PartitionDesc(tbl, partSpec);
MapWork mapWork = new MapWork();
mapWork.setVectorMode(isVectorized);
@@ -1260,11 +1284,16 @@ public class TestInputOutputFormat {
new LinkedHashMap<String, ArrayList<String>>();
ArrayList<String> aliases = new ArrayList<String>();
aliases.add(tableName);
- aliasMap.put(root.toString(), aliases);
- mapWork.setPathToAliases(aliasMap);
LinkedHashMap<String, PartitionDesc> partMap =
new LinkedHashMap<String, PartitionDesc>();
- partMap.put(root.toString(), part);
+ for(int p=0; p < partitions; ++p) {
+ aliasMap.put(partPath[p], aliases);
+ LinkedHashMap<String, String> partSpec =
+ new LinkedHashMap<String, String>();
+ PartitionDesc part = new PartitionDesc(tbl, partSpec);
+ partMap.put(partPath[p], part);
+ }
+ mapWork.setPathToAliases(aliasMap);
mapWork.setPathToPartitionInfo(partMap);
mapWork.setScratchColumnMap(new HashMap<String, Map<String, Integer>>());
mapWork.setScratchColumnVectorTypes(new HashMap<String,
@@ -1285,6 +1314,7 @@ public class TestInputOutputFormat {
* @throws Exception
*/
@Test
+ @SuppressWarnings("unchecked")
public void testVectorization() throws Exception {
// get the object inspector for MyRow
StructObjectInspector inspector;
@@ -1294,7 +1324,7 @@ public class TestInputOutputFormat {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "vectorization", inspector, true);
+ "vectorization", inspector, true, 1);
// write the orc file to the mock file system
Writer writer =
@@ -1332,6 +1362,7 @@ public class TestInputOutputFormat {
* @throws Exception
*/
@Test
+ @SuppressWarnings("unchecked")
public void testVectorizationWithBuckets() throws Exception {
// get the object inspector for MyRow
StructObjectInspector inspector;
@@ -1341,7 +1372,7 @@ public class TestInputOutputFormat {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "vectorBuckets", inspector, true);
+ "vectorBuckets", inspector, true, 1);
// write the orc file to the mock file system
Writer writer =
@@ -1377,10 +1408,11 @@ public class TestInputOutputFormat {
// test acid with vectorization, no combine
@Test
+ @SuppressWarnings("unchecked")
public void testVectorizationWithAcid() throws Exception {
StructObjectInspector inspector = new BigRowInspector();
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "vectorizationAcid", inspector, true);
+ "vectorizationAcid", inspector, true, 1);
// write the orc file to the mock file system
Path partDir = new Path(conf.get("mapred.input.dir"));
@@ -1444,6 +1476,7 @@ public class TestInputOutputFormat {
// test non-vectorized, non-acid, combine
@Test
+ @SuppressWarnings("unchecked")
public void testCombinationInputFormat() throws Exception {
// get the object inspector for MyRow
StructObjectInspector inspector;
@@ -1453,7 +1486,7 @@ public class TestInputOutputFormat {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "combination", inspector, false);
+ "combination", inspector, false, 1);
// write the orc file to the mock file system
Path partDir = new Path(conf.get("mapred.input.dir"));
@@ -1516,17 +1549,25 @@ public class TestInputOutputFormat {
public void testCombinationInputFormatWithAcid() throws Exception {
// get the object inspector for MyRow
StructObjectInspector inspector;
+ final int PARTITIONS = 2;
+ final int BUCKETS = 3;
synchronized (TestOrcFile.class) {
inspector = (StructObjectInspector)
ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "combinationAcid", inspector, false);
+ "combinationAcid", inspector, false, PARTITIONS);
// write the orc file to the mock file system
- Path partDir = new Path(conf.get("mapred.input.dir"));
- OrcRecordUpdater writer = new OrcRecordUpdater(partDir,
+ Path[] partDir = new Path[PARTITIONS];
+ String[] paths = conf.getStrings("mapred.input.dir");
+ for(int p=0; p < PARTITIONS; ++p) {
+ partDir[p] = new Path(paths[p]);
+ }
+
+ // write a base file in partition 0
+ OrcRecordUpdater writer = new OrcRecordUpdater(partDir[0],
new AcidOutputFormat.Options(conf).maximumTransactionId(10)
.writingBase(true).bucket(0).inspector(inspector));
for(int i=0; i < 10; ++i) {
@@ -1534,31 +1575,68 @@ public class TestInputOutputFormat {
}
WriterImpl baseWriter = (WriterImpl) writer.getWriter();
writer.close(false);
+
MockOutputStream outputStream = (MockOutputStream) baseWriter.getStream();
- int length0 = outputStream.file.length;
- writer = new OrcRecordUpdater(partDir,
+ outputStream.setBlocks(new MockBlock("host1", "host2"));
+
+ // write a delta file in partition 0
+ writer = new OrcRecordUpdater(partDir[0],
new AcidOutputFormat.Options(conf).maximumTransactionId(10)
.writingBase(true).bucket(1).inspector(inspector));
for(int i=10; i < 20; ++i) {
writer.insert(10, new MyRow(i, 2*i));
}
- baseWriter = (WriterImpl) writer.getWriter();
+ WriterImpl deltaWriter = (WriterImpl) writer.getWriter();
+ outputStream = (MockOutputStream) deltaWriter.getStream();
writer.close(false);
- outputStream = (MockOutputStream) baseWriter.getStream();
outputStream.setBlocks(new MockBlock("host1", "host2"));
+ // write three files in partition 1
+ for(int bucket=0; bucket < BUCKETS; ++bucket) {
+ Writer orc = OrcFile.createWriter(
+ new Path(partDir[1], "00000" + bucket + "_0"),
+ OrcFile.writerOptions(conf)
+ .blockPadding(false)
+ .bufferSize(1024)
+ .inspector(inspector));
+ orc.addRow(new MyRow(1, 2));
+ outputStream = (MockOutputStream) ((WriterImpl) orc).getStream();
+ orc.close();
+ outputStream.setBlocks(new MockBlock("host3", "host4"));
+ }
+
// call getsplits
+ conf.setInt(hive_metastoreConstants.BUCKET_COUNT, BUCKETS);
HiveInputFormat<?,?> inputFormat =
new CombineHiveInputFormat<WritableComparable, Writable>();
- try {
- InputSplit[] splits = inputFormat.getSplits(conf, 1);
- assertTrue("shouldn't reach here", false);
- } catch (IOException ioe) {
- assertEquals("CombineHiveInputFormat is incompatible"
- + " with ACID tables. Please set hive.input.format=org.apache.hadoop"
- + ".hive.ql.io.HiveInputFormat",
- ioe.getMessage());
+ InputSplit[] splits = inputFormat.getSplits(conf, 1);
+ assertEquals(3, splits.length);
+ HiveInputFormat.HiveInputSplit split =
+ (HiveInputFormat.HiveInputSplit) splits[0];
+ assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+ split.inputFormatClassName());
+ assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00000",
+ split.getPath().toString());
+ assertEquals(0, split.getStart());
+ assertEquals(580, split.getLength());
+ split = (HiveInputFormat.HiveInputSplit) splits[1];
+ assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+ split.inputFormatClassName());
+ assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00001",
+ split.getPath().toString());
+ assertEquals(0, split.getStart());
+ assertEquals(601, split.getLength());
+ CombineHiveInputFormat.CombineHiveInputSplit combineSplit =
+ (CombineHiveInputFormat.CombineHiveInputSplit) splits[2];
+ assertEquals(BUCKETS, combineSplit.getNumPaths());
+ for(int bucket=0; bucket < BUCKETS; ++bucket) {
+ assertEquals("mock:/combinationAcid/p=1/00000" + bucket + "_0",
+ combineSplit.getPath(bucket).toString());
+ assertEquals(0, combineSplit.getOffset(bucket));
+ assertEquals(227, combineSplit.getLength(bucket));
}
+ String[] hosts = combineSplit.getLocations();
+ assertEquals(2, hosts.length);
}
@Test