You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/01/19 23:11:52 UTC

orc git commit: ORC-132. Implement a merge file method and fix the number of rows written during merge. (omalley)

Repository: orc
Updated Branches:
  refs/heads/master 00c5682ad -> 33bd60603


ORC-132. Implement a merge file method and fix the number of rows written
during merge. (omalley)

Fixes #84

Signed-off-by: Owen O'Malley <om...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/33bd6060
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/33bd6060
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/33bd6060

Branch: refs/heads/master
Commit: 33bd6060328a088657df062d2a51447c6d292992
Parents: 00c5682
Author: Owen O'Malley <om...@apache.org>
Authored: Wed Jan 18 16:55:17 2017 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Thu Jan 19 14:55:07 2017 -0800

----------------------------------------------------------------------
 java/core/src/java/org/apache/orc/OrcFile.java  | 230 ++++++++++++++++++-
 java/core/src/java/org/apache/orc/Writer.java   |   3 +-
 .../org/apache/orc/impl/PhysicalFsWriter.java   |   3 -
 .../java/org/apache/orc/impl/ReaderImpl.java    |  16 +-
 .../java/org/apache/orc/impl/WriterImpl.java    |  20 +-
 .../test/org/apache/orc/TestVectorOrcFile.java  | 170 ++++++++++++++
 6 files changed, 428 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/33bd6060/java/core/src/java/org/apache/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java
index 68e49f3..cfabba9 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -19,20 +19,29 @@
 package org.apache.orc;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.orc.impl.MemoryManager;
 import org.apache.orc.impl.OrcTail;
 import org.apache.orc.impl.ReaderImpl;
 import org.apache.orc.impl.WriterImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Contains factory methods to read or write ORC files.
  */
 public class OrcFile {
+  private static final Logger LOG = LoggerFactory.getLogger(OrcFile.class);
   public static final String MAGIC = "ORC";
 
   /**
@@ -52,7 +61,8 @@ public class OrcFile {
    */
   public enum Version {
     V_0_11("0.11", 0, 11),
-    V_0_12("0.12", 0, 12);
+    V_0_12("0.12", 0, 12),
+    FUTURE("future", Integer.MAX_VALUE, Integer.MAX_VALUE);
 
     public static final Version CURRENT = V_0_12;
 
@@ -248,7 +258,7 @@ public class OrcFile {
     void preFooterWrite(WriterContext context) throws IOException;
   }
 
-  public static enum BloomFilterVersion {
+  public enum BloomFilterVersion {
     // Include both the BLOOM_FILTER and BLOOM_FILTER_UTF8 streams to support
     // both old and new readers.
     ORIGINAL("original"),
@@ -257,7 +267,7 @@ public class OrcFile {
     UTF8("utf8");
 
     private final String id;
-    private BloomFilterVersion(String id) {
+    BloomFilterVersion(String id) {
       this.id = id;
     }
 
@@ -299,6 +309,7 @@ public class OrcFile {
     private double bloomFilterFpp;
     private BloomFilterVersion bloomFilterVersion;
     private PhysicalWriter physicalWriter;
+    private WriterVersion writerVersion = CURRENT_WRITER;
 
     protected WriterOptions(Properties tableProperties, Configuration conf) {
       configuration = conf;
@@ -508,6 +519,20 @@ public class OrcFile {
       return this;
     }
 
+    /**
+     * Manually set the writer version.
+     * This is an internal API.
+     * @param version the version to write
+     * @return this
+     */
+    protected WriterOptions writerVersion(WriterVersion version) {
+      if (version == WriterVersion.FUTURE) {
+        throw new IllegalArgumentException("Can't write a future version.");
+      }
+      this.writerVersion = version;
+      return this;
+    }
+
     public boolean getBlockPadding() {
       return blockPaddingValue;
     }
@@ -587,6 +612,10 @@ public class OrcFile {
     public PhysicalWriter getPhysicalWriter() {
       return physicalWriter;
     }
+
+    public WriterVersion getWriterVersion() {
+      return writerVersion;
+    }
   }
 
   /**
@@ -642,4 +671,199 @@ public class OrcFile {
     return new WriterImpl(fs, path, opts);
   }
 
+  /**
+   * Do we understand the version in the reader?
+   * @param path the path of the file
+   * @param reader the ORC file reader
+   * @return is the version understood by this writer?
+   */
+  static boolean understandFormat(Path path, Reader reader) {
+    if (reader.getFileVersion() == Version.FUTURE) {
+      LOG.info("Can't merge {} because it has a future version.", path);
+      return false;
+    }
+    if (reader.getWriterVersion() == WriterVersion.FUTURE) {
+      LOG.info("Can't merge {} because it has a future writerVersion.", path);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Is the new reader compatible with the file that is being written?
+   * @param schema the writer schema
+   * @param fileVersion the writer fileVersion
+   * @param writerVersion the writer writerVersion
+   * @param rowIndexStride the row index stride
+   * @param compression the compression that was used
+   * @param userMetadata the user metadata
+   * @param path the new path name for warning messages
+   * @param reader the new reader
+   * @return is the reader compatible with the previous ones?
+   */
+  static boolean readerIsCompatible(TypeDescription schema,
+                                    Version fileVersion,
+                                    WriterVersion writerVersion,
+                                    int rowIndexStride,
+                                    CompressionKind compression,
+                                    Map<String, ByteBuffer> userMetadata,
+                                    Path path,
+                                    Reader reader) {
+    // now we have to check compatibility
+    if (!reader.getSchema().equals(schema)) {
+      LOG.info("Can't merge {} because of different schemas {} vs {}",
+          path, reader.getSchema(), schema);
+      return false;
+    }
+    if (reader.getCompressionKind() != compression) {
+      LOG.info("Can't merge {} because of different compression {} vs {}",
+          path, reader.getCompressionKind(), compression);
+      return false;
+    }
+    if (reader.getFileVersion() != fileVersion) {
+      LOG.info("Can't merge {} because of different file versions {} vs {}",
+          path, reader.getFileVersion(), fileVersion);
+      return false;
+    }
+    if (reader.getWriterVersion() != writerVersion) {
+      LOG.info("Can't merge {} because of different writer versions {} vs {}",
+          path, reader.getFileVersion(), fileVersion);
+      return false;
+    }
+    if (reader.getRowIndexStride() != rowIndexStride) {
+      LOG.info("Can't merge {} because of different row index strides {} vs {}",
+          path, reader.getRowIndexStride(), rowIndexStride);
+      return false;
+    }
+    for(String key: reader.getMetadataKeys()) {
+      if (userMetadata.containsKey(key)) {
+        ByteBuffer currentValue = userMetadata.get(key);
+        ByteBuffer newValue = reader.getMetadataValue(key);
+        if (!newValue.equals(currentValue)) {
+          LOG.info("Can't merge {} because of different user metadata {}", path,
+              key);
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  static void mergeMetadata(Map<String,ByteBuffer> metadata,
+                            Reader reader) {
+    for(String key: reader.getMetadataKeys()) {
+      metadata.put(key, reader.getMetadataValue(key));
+    }
+  }
+
+  /**
+   * Merges multiple ORC files that all have the same schema to produce
+   * a single ORC file.
+   * The merge will reject files that aren't compatible with the merged file
+   * so the output list may be shorter than the input list.
+   * The stripes are copied as serialized byte buffers.
+   * The user metadata are merged and files that disagree on the value
+   * associated with a key will be rejected.
+   *
+   * @param outputPath the output file
+   * @param options the options for writing with although the options related
+   *                to the input files' encodings are overridden
+   * @param inputFiles the list of files to merge
+   * @return the list of files that were successfully merged
+   * @throws IOException
+   */
+  public static List<Path> mergeFiles(Path outputPath,
+                                      WriterOptions options,
+                                      List<Path> inputFiles) throws IOException {
+    Writer output = null;
+    final Configuration conf = options.getConfiguration();
+    try {
+      byte[] buffer = new byte[0];
+      TypeDescription schema = null;
+      CompressionKind compression = null;
+      int bufferSize = 0;
+      Version fileVersion = null;
+      WriterVersion writerVersion = null;
+      int rowIndexStride = 0;
+      List<Path> result = new ArrayList<>(inputFiles.size());
+      Map<String, ByteBuffer> userMetadata = new HashMap<>();
+
+      for (Path input : inputFiles) {
+        FileSystem fs = input.getFileSystem(conf);
+        Reader reader = createReader(input,
+            readerOptions(options.getConfiguration()).filesystem(fs));
+
+        if (!understandFormat(input, reader)) {
+          continue;
+        } else if (schema == null) {
+          // if this is the first file that we are including, grab the values
+          schema = reader.getSchema();
+          compression = reader.getCompressionKind();
+          bufferSize = reader.getCompressionSize();
+          rowIndexStride = reader.getRowIndexStride();
+          fileVersion = reader.getFileVersion();
+          writerVersion = reader.getWriterVersion();
+          options.blockSize(bufferSize)
+              .version(fileVersion)
+              .writerVersion(writerVersion)
+              .compress(compression)
+              .rowIndexStride(rowIndexStride)
+              .setSchema(schema);
+          if (compression != CompressionKind.NONE) {
+            options.enforceBufferSize().bufferSize(bufferSize);
+          }
+          mergeMetadata(userMetadata, reader);
+          output = createWriter(outputPath, options);
+        } else if (!readerIsCompatible(schema, fileVersion, writerVersion,
+            rowIndexStride, compression, userMetadata, input, reader)) {
+          continue;
+        } else {
+          mergeMetadata(userMetadata, reader);
+          if (bufferSize < reader.getCompressionSize()) {
+            bufferSize = reader.getCompressionSize();
+            ((WriterImpl) output).increaseCompressionSize(bufferSize);
+          }
+        }
+        List<OrcProto.StripeStatistics> statList =
+            reader.getOrcProtoStripeStatistics();
+        try (FSDataInputStream inputStream = fs.open(input)) {
+          int stripeNum = 0;
+          result.add(input);
+
+          for (StripeInformation stripe : reader.getStripes()) {
+            int length = (int) stripe.getLength();
+            if (buffer.length < length) {
+              buffer = new byte[length];
+            }
+            long offset = stripe.getOffset();
+            inputStream.readFully(offset, buffer, 0, length);
+            output.appendStripe(buffer, 0, length, stripe, statList.get(stripeNum++));
+          }
+        }
+      }
+      if (output != null) {
+        for (Map.Entry<String, ByteBuffer> entry : userMetadata.entrySet()) {
+          output.addUserMetadata(entry.getKey(), entry.getValue());
+        }
+        output.close();
+      }
+      return result;
+    } catch (IOException ioe) {
+      if (output != null) {
+        try {
+          output.close();
+        } catch (Throwable t) {
+          // PASS
+        }
+        try {
+          FileSystem fs = options.getFileSystem() == null ?
+              outputPath.getFileSystem(conf) : options.getFileSystem();
+          fs.delete(outputPath, false);
+        } catch (Throwable t) {
+          // PASS
+        }
+      }
+      throw ioe;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/33bd6060/java/core/src/java/org/apache/orc/Writer.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/Writer.java b/java/core/src/java/org/apache/orc/Writer.java
index 596e14e..b496594 100644
--- a/java/core/src/java/org/apache/orc/Writer.java
+++ b/java/core/src/java/org/apache/orc/Writer.java
@@ -20,6 +20,7 @@ package org.apache.orc;
 
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -31,7 +32,7 @@ import org.apache.orc.TypeDescription;
 /**
  * The interface for writing ORC files.
  */
-public interface Writer {
+public interface Writer extends Closeable {
 
   /**
    * Get the schema for this writer

http://git-wip-us.apache.org/repos/asf/orc/blob/33bd6060/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
index 48a0b42..17c73ff 100644
--- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -204,9 +204,6 @@ public class PhysicalFsWriter implements PhysicalWriter {
   public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException {
     builder.setFooterLength(footerLength);
     builder.setMetadataLength(metadataLength);
-    if (compress != CompressionKind.NONE) {
-      builder.setCompressionBlockSize(bufferSize);
-    }
     OrcProto.PostScript ps = builder.build();
     // need to write this uncompressed
     long startPosn = rawWriter.getPos();

http://git-wip-us.apache.org/repos/asf/orc/blob/33bd6060/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index c24920d..9c8c06b 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -187,16 +187,22 @@ public class ReaderImpl implements Reader {
     return types;
   }
 
-  @Override
-  public OrcFile.Version getFileVersion() {
+  public static OrcFile.Version getFileVersion(List<Integer> versionList) {
+    if (versionList == null || versionList.isEmpty()) {
+      return OrcFile.Version.V_0_11;
+    }
     for (OrcFile.Version version: OrcFile.Version.values()) {
-      if ((versionList != null && !versionList.isEmpty()) &&
-          version.getMajor() == versionList.get(0) &&
+      if (version.getMajor() == versionList.get(0) &&
           version.getMinor() == versionList.get(1)) {
         return version;
       }
     }
-    return OrcFile.Version.V_0_11;
+    return OrcFile.Version.FUTURE;
+  }
+
+  @Override
+  public OrcFile.Version getFileVersion() {
+    return getFileVersion(versionList);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/orc/blob/33bd6060/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index d9140e8..2779212 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -107,10 +107,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
   private final int rowIndexStride;
   private final CompressionKind compress;
   private final CompressionCodec codec;
-  private final int bufferSize;
+  private int bufferSize;
   private final long blockSize;
   private final TypeDescription schema;
   private final PhysicalWriter physicalWriter;
+  private final OrcFile.WriterVersion writerVersion;
 
   private int columnCount;
   private long rowCount = 0;
@@ -145,6 +146,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     this.conf = opts.getConfiguration();
     this.callback = opts.getCallback();
     this.schema = opts.getSchema();
+    this.writerVersion = opts.getWriterVersion();
     bloomFilterVersion = opts.getBloomFilterVersion();
     if (callback != null) {
       callbackContext = new OrcFile.WriterContext(){
@@ -211,6 +213,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     return estBufferSize > bs ? bs : estBufferSize;
   }
 
+  /**
+   * Increase the buffer size for this writer.
+   * This function is internal only and should only be called by the
+   * ORC file merger.
+   * @param newSize the new buffer size.
+   */
+  public void increaseCompressionSize(int newSize) {
+    if (newSize > bufferSize) {
+      bufferSize = newSize;
+    }
+  }
+
   private static int getClosestBufferSize(int estBufferSize) {
     final int kb4 = 4 * 1024;
     final int kb8 = 8 * 1024;
@@ -2736,7 +2750,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
             .setMagic(OrcFile.MAGIC)
             .addVersion(version.getMajor())
             .addVersion(version.getMinor())
-            .setWriterVersion(OrcFile.CURRENT_WRITER.getId());
+            .setWriterVersion(writerVersion.getId());
     if (compress != CompressionKind.NONE) {
       builder.setCompressionBlockSize(bufferSize);
     }
@@ -2864,6 +2878,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     checkArgument(stripeStatistics != null,
         "Stripe statistics must not be null");
 
+    rowsInStripe = stripeInfo.getNumberOfRows();
     // update stripe information
     OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation
         .newBuilder()
@@ -2883,6 +2898,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
     stripes.add(dirEntry.build());
 
     // reset it after writing the stripe
+    rowCount += rowsInStripe;
     rowsInStripe = 0;
   }
 

http://git-wip-us.apache.org/repos/asf/orc/blob/33bd6060/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index 7df521d..7801156 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -20,6 +20,7 @@ package org.apache.orc;
 
 import com.google.common.collect.Lists;
 
+import org.apache.orc.impl.ReaderImpl;
 import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,12 +52,15 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.net.URL;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.ArrayList;
@@ -2944,4 +2948,170 @@ public class TestVectorOrcFile {
     assertEquals(OrcFile.WriterVersion.ORIGINAL, OrcFile.WriterVersion.from(0));
     assertEquals(OrcFile.WriterVersion.HIVE_4243, OrcFile.WriterVersion.from(2));
   }
+
+  /**
+   * Test whether the file versions are translated correctly
+   * @throws Exception
+   */
+  @Test
+  public void testFileVersion() throws Exception {
+    assertEquals(OrcFile.Version.V_0_11, ReaderImpl.getFileVersion(null));
+    assertEquals(OrcFile.Version.V_0_11, ReaderImpl.getFileVersion(new ArrayList<Integer>()));
+    assertEquals(OrcFile.Version.V_0_11,
+        ReaderImpl.getFileVersion(Arrays.asList(new Integer[]{0, 11})));
+    assertEquals(OrcFile.Version.V_0_12,
+        ReaderImpl.getFileVersion(Arrays.asList(new Integer[]{0, 12})));
+    assertEquals(OrcFile.Version.FUTURE,
+        ReaderImpl.getFileVersion(Arrays.asList(new Integer[]{9999, 0})));
+  }
+
+  @Test
+  public void testMergeUnderstood() throws Exception {
+    Path p = new Path("test.orc");
+    Reader futureVersion = Mockito.mock(Reader.class);
+    Mockito.when(futureVersion.getFileVersion()).thenReturn(OrcFile.Version.FUTURE);
+    Mockito.when(futureVersion.getWriterVersion()).thenReturn(OrcFile.WriterVersion.HIVE_4243);
+    assertEquals(false, OrcFile.understandFormat(p, futureVersion));
+    Reader futureWriter = Mockito.mock(Reader.class);
+    Mockito.when(futureWriter.getFileVersion()).thenReturn(OrcFile.Version.V_0_11);
+    Mockito.when(futureWriter.getWriterVersion()).thenReturn(OrcFile.WriterVersion.FUTURE);
+    assertEquals(false, OrcFile.understandFormat(p, futureWriter));
+    Reader current = Mockito.mock(Reader.class);
+    Mockito.when(current.getFileVersion()).thenReturn(OrcFile.Version.CURRENT);
+    Mockito.when(current.getWriterVersion()).thenReturn(OrcFile.CURRENT_WRITER);
+    assertEquals(true, OrcFile.understandFormat(p, current));
+  }
+
+  static ByteBuffer fromString(String s) {
+    return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
+  }
+
+  static byte[] fromInt(int x) {
+    return Integer.toHexString(x).getBytes(StandardCharsets.UTF_8);
+  }
+
+  @Test
+  public void testMerge() throws Exception {
+    Path input1 = new Path(workDir, "TestVectorOrcFile.testMerge1.orc");
+    fs.delete(input1, false);
+    Path input2 = new Path(workDir, "TestVectorOrcFile.testMerge2.orc");
+    fs.delete(input2, false);
+    Path input3 = new Path(workDir, "TestVectorOrcFile.testMerge3.orc");
+    fs.delete(input3, false);
+    TypeDescription schema = TypeDescription.fromString("struct<a:int,b:string>");
+    // change all of the options away from default to find anything we
+    // don't copy to the merged file
+    OrcFile.WriterOptions opts = OrcFile.writerOptions(conf)
+        .setSchema(schema)
+        .compress(CompressionKind.LZO)
+        .enforceBufferSize()
+        .bufferSize(20*1024)
+        .rowIndexStride(1000)
+        .version(OrcFile.Version.V_0_11)
+        .writerVersion(OrcFile.WriterVersion.HIVE_8732);
+
+    Writer writer = OrcFile.createWriter(input1, opts);
+    VectorizedRowBatch batch = schema.createRowBatch();
+    batch.size = 1024;
+    for(int r=0; r < 1024; ++r) {
+      ((LongColumnVector) batch.cols[0]).vector[r] = r;
+      ((BytesColumnVector) batch.cols[1]).setVal(r, fromInt(r));
+    }
+    writer.addRowBatch(batch);
+    writer.addUserMetadata("a", fromString("foo"));
+    writer.addUserMetadata("b", fromString("bar"));
+    writer.close();
+
+    // increase the buffer size to 30k
+    opts.bufferSize(30*1024);
+    writer = OrcFile.createWriter(input2, opts);
+    batch.size = 1024;
+    for(int r=0; r < 1024; ++r) {
+      ((LongColumnVector) batch.cols[0]).vector[r] = 2 * r;
+      ((BytesColumnVector) batch.cols[1]).setVal(r, fromInt(2 * r));
+    }
+    writer.addRowBatch(batch);
+    writer.addUserMetadata("a", fromString("foo"));
+    writer.addUserMetadata("c", fromString("baz"));
+    writer.close();
+
+    // decrease the buffer size to 10k
+    opts.bufferSize(10*1024);
+    writer = OrcFile.createWriter(input3, opts);
+    batch.size = 1024;
+    for(int r=0; r < 1024; ++r) {
+      ((LongColumnVector) batch.cols[0]).vector[r] = 3 * r;
+      ((BytesColumnVector) batch.cols[1]).setVal(r, fromInt(3 * r));
+    }
+    writer.addRowBatch(batch);
+    writer.addUserMetadata("c", fromString("baz"));
+    writer.addUserMetadata("d", fromString("bat"));
+    writer.close();
+
+    Path output1 = new Path(workDir, "TestVectorOrcFile.testMerge.out1.orc");
+    fs.delete(output1, false);
+    List<Path> paths = OrcFile.mergeFiles(output1,
+        OrcFile.writerOptions(conf), Arrays.asList(input1, input2, input3));
+    assertEquals(3, paths.size());
+    Reader reader = OrcFile.createReader(output1, OrcFile.readerOptions(conf));
+    assertEquals(3 * 1024, reader.getNumberOfRows());
+    assertEquals(CompressionKind.LZO, reader.getCompressionKind());
+    assertEquals(30 * 1024, reader.getCompressionSize());
+    assertEquals(1000, reader.getRowIndexStride());
+    assertEquals(OrcFile.Version.V_0_11, reader.getFileVersion());
+    assertEquals(OrcFile.WriterVersion.HIVE_8732, reader.getWriterVersion());
+    assertEquals(3, reader.getStripes().size());
+    assertEquals(4, reader.getMetadataKeys().size());
+    assertEquals(fromString("foo"), reader.getMetadataValue("a"));
+    assertEquals(fromString("bar"), reader.getMetadataValue("b"));
+    assertEquals(fromString("baz"), reader.getMetadataValue("c"));
+    assertEquals(fromString("bat"), reader.getMetadataValue("d"));
+
+    TypeDescription schema4 = TypeDescription.fromString("struct<a:int>");
+    Path input4 = new Path(workDir, "TestVectorOrcFile.testMerge4.orc");
+    fs.delete(input4, false);
+    opts.setSchema(schema4);
+    writer = OrcFile.createWriter(input4, opts);
+    batch = schema4.createRowBatch();
+    batch.size = 1024;
+    for(int r=0; r < 1024; ++r) {
+      ((LongColumnVector) batch.cols[0]).vector[r] = 4 * r;
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Path input5 = new Path(workDir, "TestVectorOrcFile.testMerge5.orc");
+    fs.delete(input5, false);
+    opts.setSchema(schema)
+        .compress(CompressionKind.NONE)
+        .bufferSize(100*1024);
+    writer = OrcFile.createWriter(input5, opts);
+    batch = schema.createRowBatch();
+    batch.size = 1024;
+    for(int r=0; r < 1024; ++r) {
+      ((LongColumnVector) batch.cols[0]).vector[r] = 4 * r;
+      ((BytesColumnVector) batch.cols[1]).setVal(r, fromInt(5 * r));
+    }
+    writer.addRowBatch(batch);
+    writer.close();
+
+    Path output2 = new Path(workDir, "TestVectorOrcFile.testMerge.out2.orc");
+    fs.delete(output2, false);
+    paths = OrcFile.mergeFiles(output2, OrcFile.writerOptions(conf),
+        Arrays.asList(input3, input4, input1, input5));
+    assertEquals(2, paths.size());
+    reader = OrcFile.createReader(output2, OrcFile.readerOptions(conf));
+    assertEquals(2 * 1024, reader.getNumberOfRows());
+    assertEquals(CompressionKind.LZO, reader.getCompressionKind());
+    assertEquals(20 * 1024, reader.getCompressionSize());
+    assertEquals(1000, reader.getRowIndexStride());
+    assertEquals(OrcFile.Version.V_0_11, reader.getFileVersion());
+    assertEquals(OrcFile.WriterVersion.HIVE_8732, reader.getWriterVersion());
+    assertEquals(2, reader.getStripes().size());
+    assertEquals(4, reader.getMetadataKeys().size());
+    assertEquals(fromString("foo"), reader.getMetadataValue("a"));
+    assertEquals(fromString("bar"), reader.getMetadataValue("b"));
+    assertEquals(fromString("baz"), reader.getMetadataValue("c"));
+    assertEquals(fromString("bat"), reader.getMetadataValue("d"));
+  }
 }