You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/14 21:26:29 UTC
svn commit: r1624899 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/io/ java/org/apache/hadoop/hive/ql/io/orc/
test/org/apache/hadoop/hive/ql/io/orc/
Author: gunther
Date: Sun Sep 14 19:26:28 2014
New Revision: 1624899
URL: http://svn.apache.org/r1624899
Log:
Revert change 'HIVE-7812: Disable CombineHiveInputFormat when ACID format is used' (Gunther Hagleitner)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1624899&r1=1624898&r2=1624899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Sun Sep 14 19:26:28 2014
@@ -305,28 +305,6 @@ public class AcidUtils {
}
/**
- * Is the given directory in ACID format?
- * @param directory the partition directory to check
- * @param conf the query configuration
- * @return true, if it is an ACID directory
- * @throws IOException
- */
- public static boolean isAcid(Path directory,
- Configuration conf) throws IOException {
- FileSystem fs = directory.getFileSystem(conf);
- for(FileStatus file: fs.listStatus(directory)) {
- String filename = file.getPath().getName();
- if (filename.startsWith(BASE_PREFIX) ||
- filename.startsWith(DELTA_PREFIX)) {
- if (file.isDir()) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
* Get the ACID state of the given directory. It finds the minimal set of
* base and diff directories. Note that because major compactions don't
* preserve the history, we can't use a base directory that includes a
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java?rev=1624899&r1=1624898&r2=1624899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java Sun Sep 14 19:26:28 2014
@@ -122,7 +122,24 @@ public class BucketizedHiveInputFormat<K
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
init(job);
- Path[] dirs = getInputPaths(job);
+ Path[] dirs = FileInputFormat.getInputPaths(job);
+ if (dirs.length == 0) {
+ // on tez we're avoiding to duplicate the file info in FileInputFormat.
+ if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ try {
+ List<Path> paths = Utilities.getInputPathsTez(job, mrwork);
+ dirs = paths.toArray(new Path[paths.size()]);
+ if (dirs.length == 0) {
+ // if we still don't have any files it's time to fail.
+ throw new IOException("No input paths specified in job");
+ }
+ } catch (Exception e) {
+ throw new IOException("Could not create input paths", e);
+ }
+ } else {
+ throw new IOException("No input paths specified in job");
+ }
+ }
JobConf newjob = new JobConf(job);
ArrayList<InputSplit> result = new ArrayList<InputSplit>();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1624899&r1=1624898&r2=1624899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Sun Sep 14 19:26:28 2014
@@ -33,7 +33,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -265,8 +264,8 @@ public class CombineHiveInputFormat<K ex
/**
* Create Hive splits based on CombineFileSplit.
*/
- private InputSplit[] getCombineSplits(JobConf job,
- int numSplits) throws IOException {
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
PerfLogger perfLogger = PerfLogger.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
init(job);
@@ -275,6 +274,17 @@ public class CombineHiveInputFormat<K ex
mrwork.getAliasToWork();
CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
.getCombineFileInputFormat();
+
+ // on tez we're avoiding duplicating path info since the info will go over
+ // rpc
+ if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ try {
+ List<Path> dirs = Utilities.getInputPathsTez(job, mrwork);
+ Utilities.setInputPaths(job, dirs);
+ } catch (Exception e) {
+ throw new IOException("Could not create input paths", e);
+ }
+ }
InputSplit[] splits = null;
if (combine == null) {
@@ -317,6 +327,13 @@ public class CombineHiveInputFormat<K ex
// ignore
}
FileSystem inpFs = path.getFileSystem(job);
+ if (inputFormatClass.isAssignableFrom(OrcInputFormat.class)) {
+ if (inpFs.exists(new Path(path, OrcRecordUpdater.ACID_FORMAT))) {
+ throw new IOException("CombineHiveInputFormat is incompatible " +
+ " with ACID tables. Please set hive.input.format=" +
+ "org.apache.hadoop.hive.ql.io.HiveInputFormat");
+ }
+ }
// Since there is no easy way of knowing whether MAPREDUCE-1597 is present in the tree or not,
// we use a configuration variable for the same
@@ -444,82 +461,6 @@ public class CombineHiveInputFormat<K ex
return result.toArray(new CombineHiveInputSplit[result.size()]);
}
- /**
- * Create Hive splits based on CombineFileSplit.
- */
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- init(job);
- Map<String, ArrayList<String>> pathToAliases = mrwork.getPathToAliases();
- Map<String, Operator<? extends OperatorDesc>> aliasToWork =
- mrwork.getAliasToWork();
-
- ArrayList<InputSplit> result = new ArrayList<InputSplit>();
-
- Path[] paths = getInputPaths(job);
-
- List<Path> nonCombinablePaths = new ArrayList<Path>(paths.length / 2);
- List<Path> combinablePaths = new ArrayList<Path>(paths.length / 2);
-
- for (Path path : paths) {
-
- PartitionDesc part =
- HiveFileFormatUtils.getPartitionDescFromPathRecursively(
- pathToPartitionInfo, path,
- IOPrepareCache.get().allocatePartitionDescMap());
-
- // Use HiveInputFormat if any of the paths is not splittable
- Class inputFormatClass = part.getInputFileFormatClass();
- String inputFormatClassName = inputFormatClass.getName();
- InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
- if (inputFormat instanceof AvoidSplitCombination &&
- ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, job)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The split [" + path +
- "] is being parked for HiveInputFormat.getSplits");
- }
- nonCombinablePaths.add(path);
- } else {
- combinablePaths.add(path);
- }
- }
-
- // Store the previous value for the path specification
- String oldPaths = job.get(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname);
- if (LOG.isDebugEnabled()) {
- LOG.debug("The received input paths are: [" + oldPaths +
- "] against the property "
- + HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname);
- }
-
- // Process the normal splits
- if (nonCombinablePaths.size() > 0) {
- FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray
- (new Path[nonCombinablePaths.size()]));
- InputSplit[] splits = super.getSplits(job, numSplits);
- for (InputSplit split : splits) {
- result.add(split);
- }
- }
-
- // Process the combine splits
- if (combinablePaths.size() > 0) {
- FileInputFormat.setInputPaths(job, combinablePaths.toArray
- (new Path[combinablePaths.size()]));
- InputSplit[] splits = getCombineSplits(job, numSplits);
- for (InputSplit split : splits) {
- result.add(split);
- }
- }
-
- // Restore the old path information back
- // This is just to prevent incompatibilities with previous versions Hive
- // if some application depends on the original value being set.
- job.set(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname, oldPaths);
- LOG.info("Number of all splits " + result.size());
- return result.toArray(new InputSplit[result.size()]);
- }
-
private void processPaths(JobConf job, CombineFileInputFormatShim combine,
List<InputSplitShim> iss, Path... path) throws IOException {
JobConf currJob = new JobConf(job);
@@ -694,12 +635,4 @@ public class CombineHiveInputFormat<K ex
return s.toString();
}
}
-
- /**
- * This is a marker interface that is used to identify the formats where
- * combine split generation is not applicable
- */
- public interface AvoidSplitCombination {
- boolean shouldSkipCombine(Path path, Configuration conf) throws IOException;
- }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1624899&r1=1624898&r2=1624899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Sun Sep 14 19:26:28 2014
@@ -295,7 +295,11 @@ public class HiveInputFormat<K extends W
}
}
- Path[] getInputPaths(JobConf job) throws IOException {
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
+ init(job);
+
Path[] dirs = FileInputFormat.getInputPaths(job);
if (dirs.length == 0) {
// on tez we're avoiding to duplicate the file info in FileInputFormat.
@@ -310,14 +314,6 @@ public class HiveInputFormat<K extends W
throw new IOException("No input paths specified in job");
}
}
- return dirs;
- }
-
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- PerfLogger perfLogger = PerfLogger.getPerfLogger();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
- init(job);
- Path[] dirs = getInputPaths(job);
JobConf newjob = new JobConf(job);
List<InputSplit> result = new ArrayList<InputSplit>();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1624899&r1=1624898&r2=1624899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Sun Sep 14 19:26:28 2014
@@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
@@ -100,8 +99,7 @@ import com.google.common.util.concurrent
*/
public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
InputFormatChecker, VectorizedInputFormatInterface,
- AcidInputFormat<NullWritable, OrcStruct>,
- CombineHiveInputFormat.AvoidSplitCombination {
+ AcidInputFormat<NullWritable, OrcStruct> {
private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
@@ -127,12 +125,6 @@ public class OrcInputFormat implements
*/
private static final double MIN_INCLUDED_LOCATION = 0.80;
- @Override
- public boolean shouldSkipCombine(Path path,
- Configuration conf) throws IOException {
- return AcidUtils.isAcid(path, conf);
- }
-
private static class OrcRecordReader
implements org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>,
StatsProvidingRecordReader {
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1624899&r1=1624898&r2=1624899&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java Sun Sep 14 19:26:28 2014
@@ -118,6 +118,7 @@ public class TestInputOutputFormat {
TimeZone gmt = TimeZone.getTimeZone("GMT+0");
DATE_FORMAT.setTimeZone(gmt);
TIME_FORMAT.setTimeZone(gmt);
+ TimeZone local = TimeZone.getDefault();
}
public static class BigRow implements Writable {
@@ -559,12 +560,6 @@ public class TestInputOutputFormat {
this.file = file;
}
- /**
- * Set the blocks and their location for the file.
- * Must be called after the stream is closed or the block length will be
- * wrong.
- * @param blocks the list of blocks
- */
public void setBlocks(MockBlock... blocks) {
file.blocks = blocks;
int offset = 0;
@@ -585,18 +580,12 @@ public class TestInputOutputFormat {
file.content = new byte[file.length];
System.arraycopy(buf.getData(), 0, file.content, 0, file.length);
}
-
- @Override
- public String toString() {
- return "Out stream to " + file.toString();
- }
}
public static class MockFileSystem extends FileSystem {
final List<MockFile> files = new ArrayList<MockFile>();
Path workingDir = new Path("/");
- @SuppressWarnings("unused")
public MockFileSystem() {
// empty
}
@@ -631,7 +620,7 @@ public class TestInputOutputFormat {
return new FSDataInputStream(new MockInputStream(file));
}
}
- throw new IOException("File not found: " + path);
+ return null;
}
@Override
@@ -754,12 +743,8 @@ public class TestInputOutputFormat {
for(MockBlock block: file.blocks) {
if (OrcInputFormat.SplitGenerator.getOverlap(block.offset,
block.length, start, len) > 0) {
- String[] topology = new String[block.hosts.length];
- for(int i=0; i < topology.length; ++i) {
- topology[i] = "/rack/ " + block.hosts[i];
- }
result.add(new BlockLocation(block.hosts, block.hosts,
- topology, block.offset, block.length));
+ block.offset, block.length));
}
}
return result.toArray(new BlockLocation[result.size()]);
@@ -1224,8 +1209,7 @@ public class TestInputOutputFormat {
Path warehouseDir,
String tableName,
ObjectInspector objectInspector,
- boolean isVectorized,
- int partitions
+ boolean isVectorized
) throws IOException {
Utilities.clearWorkMap();
JobConf conf = new JobConf();
@@ -1234,20 +1218,9 @@ public class TestInputOutputFormat {
conf.set("hive.vectorized.execution.enabled", Boolean.toString(isVectorized));
conf.set("fs.mock.impl", MockFileSystem.class.getName());
conf.set("mapred.mapper.class", ExecMapper.class.getName());
- Path root = new Path(warehouseDir, tableName);
- // clean out previous contents
+ Path root = new Path(warehouseDir, tableName + "/p=0");
((MockFileSystem) root.getFileSystem(conf)).clear();
- // build partition strings
- String[] partPath = new String[partitions];
- StringBuilder buffer = new StringBuilder();
- for(int p=0; p < partitions; ++p) {
- partPath[p] = new Path(root, "p=" + p).toString();
- if (p != 0) {
- buffer.append(',');
- }
- buffer.append(partPath[p]);
- }
- conf.set("mapred.input.dir", buffer.toString());
+ conf.set("mapred.input.dir", root.toString());
StringBuilder columnIds = new StringBuilder();
StringBuilder columnNames = new StringBuilder();
StringBuilder columnTypes = new StringBuilder();
@@ -1276,6 +1249,9 @@ public class TestInputOutputFormat {
tblProps.put("columns.types", columnTypes.toString());
TableDesc tbl = new TableDesc(OrcInputFormat.class, OrcOutputFormat.class,
tblProps);
+ LinkedHashMap<String, String> partSpec =
+ new LinkedHashMap<String, String>();
+ PartitionDesc part = new PartitionDesc(tbl, partSpec);
MapWork mapWork = new MapWork();
mapWork.setVectorMode(isVectorized);
@@ -1284,16 +1260,11 @@ public class TestInputOutputFormat {
new LinkedHashMap<String, ArrayList<String>>();
ArrayList<String> aliases = new ArrayList<String>();
aliases.add(tableName);
+ aliasMap.put(root.toString(), aliases);
+ mapWork.setPathToAliases(aliasMap);
LinkedHashMap<String, PartitionDesc> partMap =
new LinkedHashMap<String, PartitionDesc>();
- for(int p=0; p < partitions; ++p) {
- aliasMap.put(partPath[p], aliases);
- LinkedHashMap<String, String> partSpec =
- new LinkedHashMap<String, String>();
- PartitionDesc part = new PartitionDesc(tbl, partSpec);
- partMap.put(partPath[p], part);
- }
- mapWork.setPathToAliases(aliasMap);
+ partMap.put(root.toString(), part);
mapWork.setPathToPartitionInfo(partMap);
mapWork.setScratchColumnMap(new HashMap<String, Map<String, Integer>>());
mapWork.setScratchColumnVectorTypes(new HashMap<String,
@@ -1314,7 +1285,6 @@ public class TestInputOutputFormat {
* @throws Exception
*/
@Test
- @SuppressWarnings("unchecked")
public void testVectorization() throws Exception {
// get the object inspector for MyRow
StructObjectInspector inspector;
@@ -1324,7 +1294,7 @@ public class TestInputOutputFormat {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "vectorization", inspector, true, 1);
+ "vectorization", inspector, true);
// write the orc file to the mock file system
Writer writer =
@@ -1362,7 +1332,6 @@ public class TestInputOutputFormat {
* @throws Exception
*/
@Test
- @SuppressWarnings("unchecked")
public void testVectorizationWithBuckets() throws Exception {
// get the object inspector for MyRow
StructObjectInspector inspector;
@@ -1372,7 +1341,7 @@ public class TestInputOutputFormat {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "vectorBuckets", inspector, true, 1);
+ "vectorBuckets", inspector, true);
// write the orc file to the mock file system
Writer writer =
@@ -1408,11 +1377,10 @@ public class TestInputOutputFormat {
// test acid with vectorization, no combine
@Test
- @SuppressWarnings("unchecked")
public void testVectorizationWithAcid() throws Exception {
StructObjectInspector inspector = new BigRowInspector();
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "vectorizationAcid", inspector, true, 1);
+ "vectorizationAcid", inspector, true);
// write the orc file to the mock file system
Path partDir = new Path(conf.get("mapred.input.dir"));
@@ -1476,7 +1444,6 @@ public class TestInputOutputFormat {
// test non-vectorized, non-acid, combine
@Test
- @SuppressWarnings("unchecked")
public void testCombinationInputFormat() throws Exception {
// get the object inspector for MyRow
StructObjectInspector inspector;
@@ -1486,7 +1453,7 @@ public class TestInputOutputFormat {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "combination", inspector, false, 1);
+ "combination", inspector, false);
// write the orc file to the mock file system
Path partDir = new Path(conf.get("mapred.input.dir"));
@@ -1549,25 +1516,17 @@ public class TestInputOutputFormat {
public void testCombinationInputFormatWithAcid() throws Exception {
// get the object inspector for MyRow
StructObjectInspector inspector;
- final int PARTITIONS = 2;
- final int BUCKETS = 3;
synchronized (TestOrcFile.class) {
inspector = (StructObjectInspector)
ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
JobConf conf = createMockExecutionEnvironment(workDir, new Path("mock:///"),
- "combinationAcid", inspector, false, PARTITIONS);
+ "combinationAcid", inspector, false);
// write the orc file to the mock file system
- Path[] partDir = new Path[PARTITIONS];
- String[] paths = conf.getStrings("mapred.input.dir");
- for(int p=0; p < PARTITIONS; ++p) {
- partDir[p] = new Path(paths[p]);
- }
-
- // write a base file in partition 0
- OrcRecordUpdater writer = new OrcRecordUpdater(partDir[0],
+ Path partDir = new Path(conf.get("mapred.input.dir"));
+ OrcRecordUpdater writer = new OrcRecordUpdater(partDir,
new AcidOutputFormat.Options(conf).maximumTransactionId(10)
.writingBase(true).bucket(0).inspector(inspector));
for(int i=0; i < 10; ++i) {
@@ -1575,68 +1534,31 @@ public class TestInputOutputFormat {
}
WriterImpl baseWriter = (WriterImpl) writer.getWriter();
writer.close(false);
-
MockOutputStream outputStream = (MockOutputStream) baseWriter.getStream();
- outputStream.setBlocks(new MockBlock("host1", "host2"));
-
- // write a delta file in partition 0
- writer = new OrcRecordUpdater(partDir[0],
+ int length0 = outputStream.file.length;
+ writer = new OrcRecordUpdater(partDir,
new AcidOutputFormat.Options(conf).maximumTransactionId(10)
.writingBase(true).bucket(1).inspector(inspector));
for(int i=10; i < 20; ++i) {
writer.insert(10, new MyRow(i, 2*i));
}
- WriterImpl deltaWriter = (WriterImpl) writer.getWriter();
- outputStream = (MockOutputStream) deltaWriter.getStream();
+ baseWriter = (WriterImpl) writer.getWriter();
writer.close(false);
+ outputStream = (MockOutputStream) baseWriter.getStream();
outputStream.setBlocks(new MockBlock("host1", "host2"));
- // write three files in partition 1
- for(int bucket=0; bucket < BUCKETS; ++bucket) {
- Writer orc = OrcFile.createWriter(
- new Path(partDir[1], "00000" + bucket + "_0"),
- OrcFile.writerOptions(conf)
- .blockPadding(false)
- .bufferSize(1024)
- .inspector(inspector));
- orc.addRow(new MyRow(1, 2));
- outputStream = (MockOutputStream) ((WriterImpl) orc).getStream();
- orc.close();
- outputStream.setBlocks(new MockBlock("host3", "host4"));
- }
-
// call getsplits
- conf.setInt(hive_metastoreConstants.BUCKET_COUNT, BUCKETS);
HiveInputFormat<?,?> inputFormat =
new CombineHiveInputFormat<WritableComparable, Writable>();
- InputSplit[] splits = inputFormat.getSplits(conf, 1);
- assertEquals(3, splits.length);
- HiveInputFormat.HiveInputSplit split =
- (HiveInputFormat.HiveInputSplit) splits[0];
- assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
- split.inputFormatClassName());
- assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00000",
- split.getPath().toString());
- assertEquals(0, split.getStart());
- assertEquals(580, split.getLength());
- split = (HiveInputFormat.HiveInputSplit) splits[1];
- assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
- split.inputFormatClassName());
- assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00001",
- split.getPath().toString());
- assertEquals(0, split.getStart());
- assertEquals(601, split.getLength());
- CombineHiveInputFormat.CombineHiveInputSplit combineSplit =
- (CombineHiveInputFormat.CombineHiveInputSplit) splits[2];
- assertEquals(BUCKETS, combineSplit.getNumPaths());
- for(int bucket=0; bucket < BUCKETS; ++bucket) {
- assertEquals("mock:/combinationAcid/p=1/00000" + bucket + "_0",
- combineSplit.getPath(bucket).toString());
- assertEquals(0, combineSplit.getOffset(bucket));
- assertEquals(227, combineSplit.getLength(bucket));
+ try {
+ InputSplit[] splits = inputFormat.getSplits(conf, 1);
+ assertTrue("shouldn't reach here", false);
+ } catch (IOException ioe) {
+ assertEquals("CombineHiveInputFormat is incompatible"
+ + " with ACID tables. Please set hive.input.format=org.apache.hadoop"
+ + ".hive.ql.io.HiveInputFormat",
+ ioe.getMessage());
}
- String[] hosts = combineSplit.getLocations();
- assertEquals(2, hosts.length);
}
@Test