You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/01/19 23:11:52 UTC
orc git commit: ORC-132. Implement a merge file method and fix the
number of rows written during merge. (omalley)
Repository: orc
Updated Branches:
refs/heads/master 00c5682ad -> 33bd60603
ORC-132. Implement a merge file method and fix the number of rows written
during merge. (omalley)
Fixes #84
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/33bd6060
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/33bd6060
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/33bd6060
Branch: refs/heads/master
Commit: 33bd6060328a088657df062d2a51447c6d292992
Parents: 00c5682
Author: Owen O'Malley <om...@apache.org>
Authored: Wed Jan 18 16:55:17 2017 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Thu Jan 19 14:55:07 2017 -0800
----------------------------------------------------------------------
java/core/src/java/org/apache/orc/OrcFile.java | 230 ++++++++++++++++++-
java/core/src/java/org/apache/orc/Writer.java | 3 +-
.../org/apache/orc/impl/PhysicalFsWriter.java | 3 -
.../java/org/apache/orc/impl/ReaderImpl.java | 16 +-
.../java/org/apache/orc/impl/WriterImpl.java | 20 +-
.../test/org/apache/orc/TestVectorOrcFile.java | 170 ++++++++++++++
6 files changed, 428 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/33bd6060/java/core/src/java/org/apache/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java
index 68e49f3..cfabba9 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -19,20 +19,29 @@
package org.apache.orc;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.orc.impl.MemoryManager;
import org.apache.orc.impl.OrcTail;
import org.apache.orc.impl.ReaderImpl;
import org.apache.orc.impl.WriterImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Contains factory methods to read or write ORC files.
*/
public class OrcFile {
+ private static final Logger LOG = LoggerFactory.getLogger(OrcFile.class);
public static final String MAGIC = "ORC";
/**
@@ -52,7 +61,8 @@ public class OrcFile {
*/
public enum Version {
V_0_11("0.11", 0, 11),
- V_0_12("0.12", 0, 12);
+ V_0_12("0.12", 0, 12),
+ FUTURE("future", Integer.MAX_VALUE, Integer.MAX_VALUE);
public static final Version CURRENT = V_0_12;
@@ -248,7 +258,7 @@ public class OrcFile {
void preFooterWrite(WriterContext context) throws IOException;
}
- public static enum BloomFilterVersion {
+ public enum BloomFilterVersion {
// Include both the BLOOM_FILTER and BLOOM_FILTER_UTF8 streams to support
// both old and new readers.
ORIGINAL("original"),
@@ -257,7 +267,7 @@ public class OrcFile {
UTF8("utf8");
private final String id;
- private BloomFilterVersion(String id) {
+ BloomFilterVersion(String id) {
this.id = id;
}
@@ -299,6 +309,7 @@ public class OrcFile {
private double bloomFilterFpp;
private BloomFilterVersion bloomFilterVersion;
private PhysicalWriter physicalWriter;
+ private WriterVersion writerVersion = CURRENT_WRITER;
protected WriterOptions(Properties tableProperties, Configuration conf) {
configuration = conf;
@@ -508,6 +519,20 @@ public class OrcFile {
return this;
}
+ /**
+ * Manually set the writer version.
+ * This is an internal API.
+ * @param version the version to write
+ * @return this
+ */
+ protected WriterOptions writerVersion(WriterVersion version) {
+ if (version == WriterVersion.FUTURE) {
+ throw new IllegalArgumentException("Can't write a future version.");
+ }
+ this.writerVersion = version;
+ return this;
+ }
+
public boolean getBlockPadding() {
return blockPaddingValue;
}
@@ -587,6 +612,10 @@ public class OrcFile {
public PhysicalWriter getPhysicalWriter() {
return physicalWriter;
}
+
+ public WriterVersion getWriterVersion() {
+ return writerVersion;
+ }
}
/**
@@ -642,4 +671,199 @@ public class OrcFile {
return new WriterImpl(fs, path, opts);
}
+ /**
+ * Do we understand the version in the reader?
+ * @param path the path of the file
+ * @param reader the ORC file reader
+ * @return is the version understood by this writer?
+ */
+ static boolean understandFormat(Path path, Reader reader) {
+ if (reader.getFileVersion() == Version.FUTURE) {
+ LOG.info("Can't merge {} because it has a future version.", path);
+ return false;
+ }
+ if (reader.getWriterVersion() == WriterVersion.FUTURE) {
+ LOG.info("Can't merge {} because it has a future writerVersion.", path);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Is the new reader compatible with the file that is being written?
+ * @param schema the writer schema
+ * @param fileVersion the writer fileVersion
+ * @param writerVersion the writer writerVersion
+ * @param rowIndexStride the row index stride
+ * @param compression the compression that was used
+ * @param userMetadata the user metadata
+ * @param path the new path name for warning messages
+ * @param reader the new reader
+ * @return is the reader compatible with the previous ones?
+ */
+ static boolean readerIsCompatible(TypeDescription schema,
+ Version fileVersion,
+ WriterVersion writerVersion,
+ int rowIndexStride,
+ CompressionKind compression,
+ Map<String, ByteBuffer> userMetadata,
+ Path path,
+ Reader reader) {
+ // now we have to check compatibility
+ if (!reader.getSchema().equals(schema)) {
+ LOG.info("Can't merge {} because of different schemas {} vs {}",
+ path, reader.getSchema(), schema);
+ return false;
+ }
+ if (reader.getCompressionKind() != compression) {
+ LOG.info("Can't merge {} because of different compression {} vs {}",
+ path, reader.getCompressionKind(), compression);
+ return false;
+ }
+ if (reader.getFileVersion() != fileVersion) {
+ LOG.info("Can't merge {} because of different file versions {} vs {}",
+ path, reader.getFileVersion(), fileVersion);
+ return false;
+ }
+ if (reader.getWriterVersion() != writerVersion) {
+ LOG.info("Can't merge {} because of different writer versions {} vs {}",
+ path, reader.getFileVersion(), fileVersion);
+ return false;
+ }
+ if (reader.getRowIndexStride() != rowIndexStride) {
+ LOG.info("Can't merge {} because of different row index strides {} vs {}",
+ path, reader.getRowIndexStride(), rowIndexStride);
+ return false;
+ }
+ for(String key: reader.getMetadataKeys()) {
+ if (userMetadata.containsKey(key)) {
+ ByteBuffer currentValue = userMetadata.get(key);
+ ByteBuffer newValue = reader.getMetadataValue(key);
+ if (!newValue.equals(currentValue)) {
+ LOG.info("Can't merge {} because of different user metadata {}", path,
+ key);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ static void mergeMetadata(Map<String,ByteBuffer> metadata,
+ Reader reader) {
+ for(String key: reader.getMetadataKeys()) {
+ metadata.put(key, reader.getMetadataValue(key));
+ }
+ }
+
+ /**
+ * Merges multiple ORC files that all have the same schema to produce
+ * a single ORC file.
+ * The merge will reject files that aren't compatible with the merged file
+ * so the output list may be shorter than the input list.
+ * The stripes are copied as serialized byte buffers.
+ * The user metadata are merged and files that disagree on the value
+ * associated with a key will be rejected.
+ *
+ * @param outputPath the output file
+ * @param options the options for writing with although the options related
+ * to the input files' encodings are overridden
+ * @param inputFiles the list of files to merge
+ * @return the list of files that were successfully merged
+ * @throws IOException
+ */
+ public static List<Path> mergeFiles(Path outputPath,
+ WriterOptions options,
+ List<Path> inputFiles) throws IOException {
+ Writer output = null;
+ final Configuration conf = options.getConfiguration();
+ try {
+ byte[] buffer = new byte[0];
+ TypeDescription schema = null;
+ CompressionKind compression = null;
+ int bufferSize = 0;
+ Version fileVersion = null;
+ WriterVersion writerVersion = null;
+ int rowIndexStride = 0;
+ List<Path> result = new ArrayList<>(inputFiles.size());
+ Map<String, ByteBuffer> userMetadata = new HashMap<>();
+
+ for (Path input : inputFiles) {
+ FileSystem fs = input.getFileSystem(conf);
+ Reader reader = createReader(input,
+ readerOptions(options.getConfiguration()).filesystem(fs));
+
+ if (!understandFormat(input, reader)) {
+ continue;
+ } else if (schema == null) {
+ // if this is the first file that we are including, grab the values
+ schema = reader.getSchema();
+ compression = reader.getCompressionKind();
+ bufferSize = reader.getCompressionSize();
+ rowIndexStride = reader.getRowIndexStride();
+ fileVersion = reader.getFileVersion();
+ writerVersion = reader.getWriterVersion();
+ options.blockSize(bufferSize)
+ .version(fileVersion)
+ .writerVersion(writerVersion)
+ .compress(compression)
+ .rowIndexStride(rowIndexStride)
+ .setSchema(schema);
+ if (compression != CompressionKind.NONE) {
+ options.enforceBufferSize().bufferSize(bufferSize);
+ }
+ mergeMetadata(userMetadata, reader);
+ output = createWriter(outputPath, options);
+ } else if (!readerIsCompatible(schema, fileVersion, writerVersion,
+ rowIndexStride, compression, userMetadata, input, reader)) {
+ continue;
+ } else {
+ mergeMetadata(userMetadata, reader);
+ if (bufferSize < reader.getCompressionSize()) {
+ bufferSize = reader.getCompressionSize();
+ ((WriterImpl) output).increaseCompressionSize(bufferSize);
+ }
+ }
+ List<OrcProto.StripeStatistics> statList =
+ reader.getOrcProtoStripeStatistics();
+ try (FSDataInputStream inputStream = fs.open(input)) {
+ int stripeNum = 0;
+ result.add(input);
+
+ for (StripeInformation stripe : reader.getStripes()) {
+ int length = (int) stripe.getLength();
+ if (buffer.length < length) {
+ buffer = new byte[length];
+ }
+ long offset = stripe.getOffset();
+ inputStream.readFully(offset, buffer, 0, length);
+ output.appendStripe(buffer, 0, length, stripe, statList.get(stripeNum++));
+ }
+ }
+ }
+ if (output != null) {
+ for (Map.Entry<String, ByteBuffer> entry : userMetadata.entrySet()) {
+ output.addUserMetadata(entry.getKey(), entry.getValue());
+ }
+ output.close();
+ }
+ return result;
+ } catch (IOException ioe) {
+ if (output != null) {
+ try {
+ output.close();
+ } catch (Throwable t) {
+ // PASS
+ }
+ try {
+ FileSystem fs = options.getFileSystem() == null ?
+ outputPath.getFileSystem(conf) : options.getFileSystem();
+ fs.delete(outputPath, false);
+ } catch (Throwable t) {
+ // PASS
+ }
+ }
+ throw ioe;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/33bd6060/java/core/src/java/org/apache/orc/Writer.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/Writer.java b/java/core/src/java/org/apache/orc/Writer.java
index 596e14e..b496594 100644
--- a/java/core/src/java/org/apache/orc/Writer.java
+++ b/java/core/src/java/org/apache/orc/Writer.java
@@ -20,6 +20,7 @@ package org.apache.orc;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -31,7 +32,7 @@ import org.apache.orc.TypeDescription;
/**
* The interface for writing ORC files.
*/
-public interface Writer {
+public interface Writer extends Closeable {
/**
* Get the schema for this writer
http://git-wip-us.apache.org/repos/asf/orc/blob/33bd6060/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
index 48a0b42..17c73ff 100644
--- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -204,9 +204,6 @@ public class PhysicalFsWriter implements PhysicalWriter {
public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
builder.setFooterLength(footerLength);
builder.setMetadataLength(metadataLength);
- if (compress != CompressionKind.NONE) {
- builder.setCompressionBlockSize(bufferSize);
- }
OrcProto.PostScript ps = builder.build();
// need to write this uncompressed
long startPosn = rawWriter.getPos();
http://git-wip-us.apache.org/repos/asf/orc/blob/33bd6060/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index c24920d..9c8c06b 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -187,16 +187,22 @@ public class ReaderImpl implements Reader {
return types;
}
- @Override
- public OrcFile.Version getFileVersion() {
+ public static OrcFile.Version getFileVersion(List<Integer> versionList) {
+ if (versionList == null || versionList.isEmpty()) {
+ return OrcFile.Version.V_0_11;
+ }
for (OrcFile.Version version: OrcFile.Version.values()) {
- if ((versionList != null && !versionList.isEmpty()) &&
- version.getMajor() == versionList.get(0) &&
+ if (version.getMajor() == versionList.get(0) &&
version.getMinor() == versionList.get(1)) {
return version;
}
}
- return OrcFile.Version.V_0_11;
+ return OrcFile.Version.FUTURE;
+ }
+
+ @Override
+ public OrcFile.Version getFileVersion() {
+ return getFileVersion(versionList);
}
@Override
http://git-wip-us.apache.org/repos/asf/orc/blob/33bd6060/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index d9140e8..2779212 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -107,10 +107,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private final int rowIndexStride;
private final CompressionKind compress;
private final CompressionCodec codec;
- private final int bufferSize;
+ private int bufferSize;
private final long blockSize;
private final TypeDescription schema;
private final PhysicalWriter physicalWriter;
+ private final OrcFile.WriterVersion writerVersion;
private int columnCount;
private long rowCount = 0;
@@ -145,6 +146,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
this.conf = opts.getConfiguration();
this.callback = opts.getCallback();
this.schema = opts.getSchema();
+ this.writerVersion = opts.getWriterVersion();
bloomFilterVersion = opts.getBloomFilterVersion();
if (callback != null) {
callbackContext = new OrcFile.WriterContext(){
@@ -211,6 +213,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
return estBufferSize > bs ? bs : estBufferSize;
}
+ /**
+ * Increase the buffer size for this writer.
+ * This function is internal only and should only be called by the
+ * ORC file merger.
+ * @param newSize the new buffer size.
+ */
+ public void increaseCompressionSize(int newSize) {
+ if (newSize > bufferSize) {
+ bufferSize = newSize;
+ }
+ }
+
private static int getClosestBufferSize(int estBufferSize) {
final int kb4 = 4 * 1024;
final int kb8 = 8 * 1024;
@@ -2736,7 +2750,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
.setMagic(OrcFile.MAGIC)
.addVersion(version.getMajor())
.addVersion(version.getMinor())
- .setWriterVersion(OrcFile.CURRENT_WRITER.getId());
+ .setWriterVersion(writerVersion.getId());
if (compress != CompressionKind.NONE) {
builder.setCompressionBlockSize(bufferSize);
}
@@ -2864,6 +2878,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
checkArgument(stripeStatistics != null,
"Stripe statistics must not be null");
+ rowsInStripe = stripeInfo.getNumberOfRows();
// update stripe information
OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation
.newBuilder()
@@ -2883,6 +2898,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
stripes.add(dirEntry.build());
// reset it after writing the stripe
+ rowCount += rowsInStripe;
rowsInStripe = 0;
}
http://git-wip-us.apache.org/repos/asf/orc/blob/33bd6060/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index 7df521d..7801156 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -20,6 +20,7 @@ package org.apache.orc;
import com.google.common.collect.Lists;
+import org.apache.orc.impl.ReaderImpl;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -51,12 +52,15 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URL;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
@@ -2944,4 +2948,170 @@ public class TestVectorOrcFile {
assertEquals(OrcFile.WriterVersion.ORIGINAL, OrcFile.WriterVersion.from(0));
assertEquals(OrcFile.WriterVersion.HIVE_4243, OrcFile.WriterVersion.from(2));
}
+
+ /**
+ * Test whether the file versions are translated correctly
+ * @throws Exception
+ */
+ @Test
+ public void testFileVersion() throws Exception {
+ assertEquals(OrcFile.Version.V_0_11, ReaderImpl.getFileVersion(null));
+ assertEquals(OrcFile.Version.V_0_11, ReaderImpl.getFileVersion(new ArrayList<Integer>()));
+ assertEquals(OrcFile.Version.V_0_11,
+ ReaderImpl.getFileVersion(Arrays.asList(new Integer[]{0, 11})));
+ assertEquals(OrcFile.Version.V_0_12,
+ ReaderImpl.getFileVersion(Arrays.asList(new Integer[]{0, 12})));
+ assertEquals(OrcFile.Version.FUTURE,
+ ReaderImpl.getFileVersion(Arrays.asList(new Integer[]{9999, 0})));
+ }
+
+ @Test
+ public void testMergeUnderstood() throws Exception {
+ Path p = new Path("test.orc");
+ Reader futureVersion = Mockito.mock(Reader.class);
+ Mockito.when(futureVersion.getFileVersion()).thenReturn(OrcFile.Version.FUTURE);
+ Mockito.when(futureVersion.getWriterVersion()).thenReturn(OrcFile.WriterVersion.HIVE_4243);
+ assertEquals(false, OrcFile.understandFormat(p, futureVersion));
+ Reader futureWriter = Mockito.mock(Reader.class);
+ Mockito.when(futureWriter.getFileVersion()).thenReturn(OrcFile.Version.V_0_11);
+ Mockito.when(futureWriter.getWriterVersion()).thenReturn(OrcFile.WriterVersion.FUTURE);
+ assertEquals(false, OrcFile.understandFormat(p, futureWriter));
+ Reader current = Mockito.mock(Reader.class);
+ Mockito.when(current.getFileVersion()).thenReturn(OrcFile.Version.CURRENT);
+ Mockito.when(current.getWriterVersion()).thenReturn(OrcFile.CURRENT_WRITER);
+ assertEquals(true, OrcFile.understandFormat(p, current));
+ }
+
+ static ByteBuffer fromString(String s) {
+ return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
+ }
+
+ static byte[] fromInt(int x) {
+ return Integer.toHexString(x).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testMerge() throws Exception {
+ Path input1 = new Path(workDir, "TestVectorOrcFile.testMerge1.orc");
+ fs.delete(input1, false);
+ Path input2 = new Path(workDir, "TestVectorOrcFile.testMerge2.orc");
+ fs.delete(input2, false);
+ Path input3 = new Path(workDir, "TestVectorOrcFile.testMerge3.orc");
+ fs.delete(input3, false);
+ TypeDescription schema = TypeDescription.fromString("struct<a:int,b:string>");
+ // change all of the options away from default to find anything we
+ // don't copy to the merged file
+ OrcFile.WriterOptions opts = OrcFile.writerOptions(conf)
+ .setSchema(schema)
+ .compress(CompressionKind.LZO)
+ .enforceBufferSize()
+ .bufferSize(20*1024)
+ .rowIndexStride(1000)
+ .version(OrcFile.Version.V_0_11)
+ .writerVersion(OrcFile.WriterVersion.HIVE_8732);
+
+ Writer writer = OrcFile.createWriter(input1, opts);
+ VectorizedRowBatch batch = schema.createRowBatch();
+ batch.size = 1024;
+ for(int r=0; r < 1024; ++r) {
+ ((LongColumnVector) batch.cols[0]).vector[r] = r;
+ ((BytesColumnVector) batch.cols[1]).setVal(r, fromInt(r));
+ }
+ writer.addRowBatch(batch);
+ writer.addUserMetadata("a", fromString("foo"));
+ writer.addUserMetadata("b", fromString("bar"));
+ writer.close();
+
+ // increase the buffer size to 30k
+ opts.bufferSize(30*1024);
+ writer = OrcFile.createWriter(input2, opts);
+ batch.size = 1024;
+ for(int r=0; r < 1024; ++r) {
+ ((LongColumnVector) batch.cols[0]).vector[r] = 2 * r;
+ ((BytesColumnVector) batch.cols[1]).setVal(r, fromInt(2 * r));
+ }
+ writer.addRowBatch(batch);
+ writer.addUserMetadata("a", fromString("foo"));
+ writer.addUserMetadata("c", fromString("baz"));
+ writer.close();
+
+ // decrease the buffer size to 10k
+ opts.bufferSize(10*1024);
+ writer = OrcFile.createWriter(input3, opts);
+ batch.size = 1024;
+ for(int r=0; r < 1024; ++r) {
+ ((LongColumnVector) batch.cols[0]).vector[r] = 3 * r;
+ ((BytesColumnVector) batch.cols[1]).setVal(r, fromInt(3 * r));
+ }
+ writer.addRowBatch(batch);
+ writer.addUserMetadata("c", fromString("baz"));
+ writer.addUserMetadata("d", fromString("bat"));
+ writer.close();
+
+ Path output1 = new Path(workDir, "TestVectorOrcFile.testMerge.out1.orc");
+ fs.delete(output1, false);
+ List<Path> paths = OrcFile.mergeFiles(output1,
+ OrcFile.writerOptions(conf), Arrays.asList(input1, input2, input3));
+ assertEquals(3, paths.size());
+ Reader reader = OrcFile.createReader(output1, OrcFile.readerOptions(conf));
+ assertEquals(3 * 1024, reader.getNumberOfRows());
+ assertEquals(CompressionKind.LZO, reader.getCompressionKind());
+ assertEquals(30 * 1024, reader.getCompressionSize());
+ assertEquals(1000, reader.getRowIndexStride());
+ assertEquals(OrcFile.Version.V_0_11, reader.getFileVersion());
+ assertEquals(OrcFile.WriterVersion.HIVE_8732, reader.getWriterVersion());
+ assertEquals(3, reader.getStripes().size());
+ assertEquals(4, reader.getMetadataKeys().size());
+ assertEquals(fromString("foo"), reader.getMetadataValue("a"));
+ assertEquals(fromString("bar"), reader.getMetadataValue("b"));
+ assertEquals(fromString("baz"), reader.getMetadataValue("c"));
+ assertEquals(fromString("bat"), reader.getMetadataValue("d"));
+
+ TypeDescription schema4 = TypeDescription.fromString("struct<a:int>");
+ Path input4 = new Path(workDir, "TestVectorOrcFile.testMerge4.orc");
+ fs.delete(input4, false);
+ opts.setSchema(schema4);
+ writer = OrcFile.createWriter(input4, opts);
+ batch = schema4.createRowBatch();
+ batch.size = 1024;
+ for(int r=0; r < 1024; ++r) {
+ ((LongColumnVector) batch.cols[0]).vector[r] = 4 * r;
+ }
+ writer.addRowBatch(batch);
+ writer.close();
+
+ Path input5 = new Path(workDir, "TestVectorOrcFile.testMerge5.orc");
+ fs.delete(input5, false);
+ opts.setSchema(schema)
+ .compress(CompressionKind.NONE)
+ .bufferSize(100*1024);
+ writer = OrcFile.createWriter(input5, opts);
+ batch = schema.createRowBatch();
+ batch.size = 1024;
+ for(int r=0; r < 1024; ++r) {
+ ((LongColumnVector) batch.cols[0]).vector[r] = 4 * r;
+ ((BytesColumnVector) batch.cols[1]).setVal(r, fromInt(5 * r));
+ }
+ writer.addRowBatch(batch);
+ writer.close();
+
+ Path output2 = new Path(workDir, "TestVectorOrcFile.testMerge.out2.orc");
+ fs.delete(output2, false);
+ paths = OrcFile.mergeFiles(output2, OrcFile.writerOptions(conf),
+ Arrays.asList(input3, input4, input1, input5));
+ assertEquals(2, paths.size());
+ reader = OrcFile.createReader(output2, OrcFile.readerOptions(conf));
+ assertEquals(2 * 1024, reader.getNumberOfRows());
+ assertEquals(CompressionKind.LZO, reader.getCompressionKind());
+ assertEquals(20 * 1024, reader.getCompressionSize());
+ assertEquals(1000, reader.getRowIndexStride());
+ assertEquals(OrcFile.Version.V_0_11, reader.getFileVersion());
+ assertEquals(OrcFile.WriterVersion.HIVE_8732, reader.getWriterVersion());
+ assertEquals(2, reader.getStripes().size());
+ assertEquals(4, reader.getMetadataKeys().size());
+ assertEquals(fromString("foo"), reader.getMetadataValue("a"));
+ assertEquals(fromString("bar"), reader.getMetadataValue("b"));
+ assertEquals(fromString("baz"), reader.getMetadataValue("c"));
+ assertEquals(fromString("bat"), reader.getMetadataValue("d"));
+ }
}