You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/02/20 00:16:58 UTC

svn commit: r912030 - in /hadoop/avro/trunk: ./ lang/java/src/java/org/apache/avro/file/ lang/java/src/test/java/org/apache/avro/

Author: cutting
Date: Fri Feb 19 23:16:57 2010
New Revision: 912030

URL: http://svn.apache.org/viewvc?rev=912030&view=rev
Log:
AVRO-414.  Add Java support for concatenating and appending data files.  Contributed by Scott Carey.

Added:
    hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
    hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Fri Feb 19 23:16:57 2010
@@ -95,6 +95,9 @@
 
     AVRO-136. Add support for building/releasing python eggs (hammer)
 
+    AVRO-414. Add Java support for concatenating and appending data
+    files. (Scott Carey via cutting)
+
   IMPROVEMENTS
 
     AVRO-157. Changes from code review comments for C++. (sbanacho)

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java Fri Feb 19 23:16:57 2010
@@ -17,10 +17,8 @@
  */
 package org.apache.avro.file;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-
-import org.apache.avro.io.BinaryDecoder;
+import java.nio.ByteBuffer;
 
 /** 
  * Interface for Avro-supported compression codecs for data files.
@@ -30,8 +28,23 @@
 abstract class Codec {
   /** Name of the codec; written to the file's metadata. */
   abstract String getName();
-  /** Compresses the input data and return the result as a ByteArrayOutputStream */
-  abstract ByteArrayOutputStream compress(ByteArrayOutputStream data) throws IOException;
-  /** Returns a decoder on the uncompressed data. */
-  abstract BinaryDecoder decompress(byte[] compressedData, int offset, int length) throws IOException;
+  /** Compresses the input data */
+  abstract ByteBuffer compress(ByteBuffer uncompressedData) throws IOException;
+  /** Decompress the data  */
+  abstract ByteBuffer decompress(ByteBuffer compressedData) throws IOException;
+  /** 
+   * Codecs must implement an equals() method.  Two codecs, A and B are equal
+   * if: the result of A and B decompressing content compressed by A is the same
+   * AND the retult of A and B decompressing content compressed by B is the same
+   **/
+  @Override
+  public abstract boolean equals(Object other);
+  /** 
+   * Codecs must implement a hashCode() method that is consistent with equals().*/
+  @Override
+  public abstract int hashCode();
+  @Override
+  public String toString() {
+    return getName();
+  }
 }

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java Fri Feb 19 23:16:57 2010
@@ -67,4 +67,10 @@
     return REGISTERED.put(name, c);
   }
   
+  @Override
+  public String toString() {
+    Codec instance = this.createInstance();
+    return instance.toString();
+  }
+  
 }

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java Fri Feb 19 23:16:57 2010
@@ -80,20 +80,22 @@
       syncBuffer[i++%SYNC_SIZE] = (byte)b;
     } while (b != -1);
     } catch (EOFException e) {
+      // fall through
+    }
+    // if no match or EOF set start to the end position
       blockStart = sin.tell();
+    //System.out.println("block start location after EOF: " + blockStart );
       return;
   }
-  }
 
   @Override
-  void skipSync() throws IOException {            // note block start
-    super.skipSync();
+  protected void blockFinished() throws IOException {
     blockStart = sin.tell() - vin.inputStream().available();
   }
 
   /** Return true if past the next synchronization point after a position. */ 
   public boolean pastSync(long position) throws IOException {
-    return blockStart >= Math.min(sin.length(), position+SYNC_SIZE);
+    return ((blockStart >= position+SYNC_SIZE)||(blockStart >= sin.length()));
   }
 
   private static class SeekableInputStream extends InputStream 
@@ -161,7 +163,7 @@
       long remaining = (in.length() - in.tell());
       return (remaining > Integer.MAX_VALUE) ? Integer.MAX_VALUE
           : (int) remaining;
-}
+    }
   }
 }
 

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java Fri Feb 19 23:16:57 2010
@@ -42,6 +42,8 @@
 
   private Schema schema;
   private DatumReader<D> reader;
+  private long blockSize;
+  private boolean availableBlock = false;
 
   /** Decoder on raw input stream.  (Used for metadata.) */
   BinaryDecoder vin;
@@ -104,7 +106,7 @@
     reader.setSchema(schema);
   }
 
-  private Codec resolveCodec() {
+  Codec resolveCodec() {
     String codecStr = getMetaString(DataFileConstants.CODEC);
     if (codecStr != null) {
       return CodecFactory.fromString(codecStr).createInstance();
@@ -142,7 +144,7 @@
    * pointer into the file. */
   public Iterator<D> iterator() { return this; }
 
-  private byte[] block = null;
+  private DataBlock block = null;
   /** True if more entries remain in this file. */
   public boolean hasNext() {
     try {
@@ -154,19 +156,14 @@
             throw new IOException("Block read partially, the data may be corrupt");
           }
         }
-        blockRemaining = vin.readLong();          // read block count
-        long compressedSize = vin.readLong();     // read block size
-        if (compressedSize > Integer.MAX_VALUE ||
-            compressedSize < 0) {
-          throw new IOException("Block size invalid or too large for this " +
-            "implementation: " + compressedSize);
+        if (hasNextBlock()) {
+          block = nextBlock(block);
+          ByteBuffer blockBuffer = ByteBuffer.wrap(block.data, 0, block.blockSize);
+          blockBuffer = codec.decompress(blockBuffer);
+          datumIn = DecoderFactory.defaultFactory().createBinaryDecoder(
+              blockBuffer.array(), blockBuffer.arrayOffset() +
+              blockBuffer.position(), blockBuffer.remaining(), datumIn);
         }
-        if (block == null || block.length < (int) compressedSize) {
-          block = new byte[(int) compressedSize];
-        }
-         // throws if it can't read the size requested
-        vin.readFixed(block, 0, (int)compressedSize); 
-         datumIn = codec.decompress(block, 0, (int) compressedSize);
       }
       return blockRemaining != 0;
     } catch (EOFException e) {                    // at EOF
@@ -195,11 +192,53 @@
     if (!hasNext())
       throw new NoSuchElementException();
     D result = reader.read(reuse, datumIn);
-    if (--blockRemaining == 0)
-      skipSync();
+    if (0 == --blockRemaining) {
+      blockFinished();
+    }
     return result;
   }
 
+  protected void blockFinished() throws IOException {
+    // nothing for the stream impl
+  }
+
+  boolean hasNextBlock() {
+    try {
+      if (availableBlock) return true;
+      if (vin.isEnd()) return false;
+      blockRemaining = vin.readLong();      // read block count
+      blockSize = vin.readLong();           // read block size
+      if (blockSize > Integer.MAX_VALUE ||
+          blockSize < 0) {
+        throw new IOException("Block size invalid or too large for this " +
+          "implementation: " + blockSize);
+      }
+      availableBlock = true;
+      return true;
+    } catch (EOFException eof) {
+      return false;
+    } catch (IOException e) {
+      throw new AvroRuntimeException(e);
+    }
+  }
+
+  DataBlock nextBlock(DataBlock reuse) throws IOException {
+    if (!hasNextBlock()) {
+      throw new NoSuchElementException();
+    }
+    if (reuse == null || reuse.data.length < (int) blockSize) {
+      reuse = new DataBlock(blockRemaining, (int) blockSize);
+    } else {
+      reuse.numEntries = blockRemaining;
+      reuse.blockSize = (int)blockSize;
+    }
+    // throws if it can't read the size requested
+    vin.readFixed(reuse.data, 0, reuse.blockSize);
+    skipSync();
+    availableBlock = false;
+    return reuse;
+  }
+
   void skipSync() throws IOException {
     vin.readFixed(syncBuffer);
     if (!Arrays.equals(syncBuffer, sync))
@@ -214,5 +253,16 @@
     vin.inputStream().close();
   }
 
+  static class DataBlock {
+    byte[] data;
+    long numEntries;
+    int blockSize;
+    DataBlock(long numEntries, int blockSize) {
+      this.data = new byte[blockSize];
+      this.numEntries = numEntries;
+      this.blockSize = blockSize;
+    }
+  }
+
 }
 

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java Fri Feb 19 23:16:57 2010
@@ -30,6 +30,7 @@
 import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.rmi.server.UID;
 import java.security.MessageDigest;
@@ -39,6 +40,7 @@
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream.DataBlock;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumWriter;
@@ -63,7 +65,7 @@
 
   private long blockCount;                       // # entries in current block
 
-  private ByteArrayOutputStream buffer;
+  private NonCopyingByteArrayOutputStream buffer;
   private Encoder bufOut;
 
   private byte[] sync;                          // 16 random bytes
@@ -181,12 +183,12 @@
     this.out = new BufferedFileOutputStream(outs);
     this.vout = new BinaryEncoder(out);
     dout.setSchema(schema);
-    this.buffer = new ByteArrayOutputStream(
+    buffer = new NonCopyingByteArrayOutputStream(
         Math.min((int)(syncInterval * 1.25), Integer.MAX_VALUE/2 -1));
+    this.bufOut = new BinaryEncoder(buffer);
     if (this.codec == null) {
       this.codec = CodecFactory.nullCodec().createInstance();
     }
-    this.bufOut = new BinaryEncoder(buffer);
     this.isOpen = true;
   }
 
@@ -249,16 +251,72 @@
       writeBlock();
   }
 
+  /**
+   * Appends data from another file.  otherFile must have the same schema.
+   * Data blocks will be copied without de-serializing data.  If the codecs
+   * of the two files are compatible, data blocks are copied directly without
+   * decompression.  If the codecs are not compatible, blocks from otherFile
+   * are uncompressed and then compressed using this file's codec.
+   * <p/>
+   * If the recompress flag is set all blocks are decompressed and then compressed
+   * using this file's codec.  This is useful when the two files have compatible
+   * compression codecs but different codec options.  For example, one might
+   * append a file compressed with deflate at compression level 1 to a file with
+   * deflate at compression level 7.  If <i>recompress</i> is false, blocks
+   * will be copied without changing the compression level.  If true, they will
+   * be converted to the new compression level.
+   * @param otherFile
+   * @param recompress
+   * @throws IOException
+   */
+  public void appendAllFrom(DataFileStream<D> otherFile, boolean recompress) throws IOException {
+    assertOpen();
+    // make sure other file has same schema
+    Schema otherSchema = otherFile.getSchema();
+    if (!this.schema.equals(otherSchema)) {
+      throw new IOException("Schema from file " + otherFile + " does not match");
+    }
+    // flush anything written so far
+    writeBlock();
+    Codec otherCodec = otherFile.resolveCodec();
+    DataBlock nextBlockRaw = null;
+    if (codec.equals(otherCodec) && !recompress) {
+      // copy raw bytes
+      while(otherFile.hasNextBlock()) {
+        nextBlockRaw = otherFile.nextBlock(nextBlockRaw);
+        writeRawBlock(nextBlockRaw);
+      }
+    } else {
+      while(otherFile.hasNextBlock()) {
+        nextBlockRaw = otherFile.nextBlock(nextBlockRaw);
+        ByteBuffer uncompressedData = otherCodec.decompress(ByteBuffer.wrap(
+            nextBlockRaw.data, 0, nextBlockRaw.blockSize));
+        ByteBuffer compressed = codec.compress(uncompressedData);
+        nextBlockRaw.data = compressed.array();
+        nextBlockRaw.blockSize = compressed.remaining();
+        writeRawBlock(nextBlockRaw);
+      }
+    }
+  }
+  
+  private void writeRawBlock(DataBlock rawBlock) throws IOException {
+    vout.writeLong(rawBlock.numEntries);
+    vout.writeLong(rawBlock.blockSize);
+    vout.writeFixed(rawBlock.data, 0, rawBlock.blockSize);
+    vout.writeFixed(sync);
+  }
+  
   private void writeBlock() throws IOException {
     if (blockCount > 0) {
       vout.writeLong(blockCount);
-      ByteArrayOutputStream block = codec.compress(buffer);
-      vout.writeLong(block.size());
-      vout.flush(); //vout may be buffered, flush before writing to out
-      block.writeTo(out);
+      ByteBuffer uncompressed = buffer.getByteArrayAsByteBuffer();
+      ByteBuffer block = codec.compress(uncompressed);
+      vout.writeLong(block.remaining());
+      vout.writeFixed(block.array(), block.position() + block.arrayOffset(), 
+          block.remaining());
       buffer.reset();
       blockCount = 0;
-      out.write(sync);
+      vout.writeFixed(sync);
     }
   }
 
@@ -274,6 +332,7 @@
   /** Flush the current state of the file. */
   public void flush() throws IOException {
     sync();
+    vout.flush();
     out.flush();
   }
 
@@ -303,4 +362,13 @@
     public long tell() { return position+count; }
   }
 
+  static class NonCopyingByteArrayOutputStream extends ByteArrayOutputStream {
+    NonCopyingByteArrayOutputStream(int initialSize) {
+      super(initialSize);
+    }
+    ByteBuffer getByteArrayAsByteBuffer() {
+      return ByteBuffer.wrap(buf, 0, count);
+    }
+  }
+
 }

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java Fri Feb 19 23:16:57 2010
@@ -18,16 +18,16 @@
 package org.apache.avro.file;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.Inflater;
 import java.util.zip.InflaterInputStream;
 
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.file.DataFileWriter.NonCopyingByteArrayOutputStream;
 
 /** 
  * Implements DEFLATE (RFC1951) compression and decompression. 
@@ -54,8 +54,11 @@
     }
   }
 
-  ByteArrayOutputStream compressionBuffer;
+  NonCopyingByteArrayOutputStream compressionBuffer;
   private Deflater deflater;
+  private Inflater inflater;
+  //currently only do 'nowrap' -- RFC 1951, not zlib
+  private boolean nowrap = true; 
   private int compressionLevel;
 
   public DeflateCodec(int compressionLevel) {
@@ -68,32 +71,75 @@
   }
 
   @Override
-  ByteArrayOutputStream compress(ByteArrayOutputStream buffer) 
-    throws IOException {
+  ByteBuffer compress(ByteBuffer data) throws IOException {
     if (compressionBuffer == null) {
-      compressionBuffer = new ByteArrayOutputStream(buffer.size());
-    } else {
-      compressionBuffer.reset();
+      compressionBuffer = new NonCopyingByteArrayOutputStream(
+          data.remaining());
     }
     if (deflater == null) {
-      deflater = new Deflater(compressionLevel, true);
+      deflater = new Deflater(compressionLevel, nowrap);
     }
     // Pass output through deflate
     DeflaterOutputStream deflaterStream = 
       new DeflaterOutputStream(compressionBuffer, deflater);
-    buffer.writeTo(deflaterStream);
+    deflaterStream.write(data.array(),
+        data.position() + data.arrayOffset(),
+        data.limit() + data.arrayOffset());
     deflaterStream.finish();
+    ByteBuffer result = compressionBuffer.getByteArrayAsByteBuffer();
     deflater.reset();
-    return compressionBuffer;
+    compressionBuffer.reset();
+    return result;
   }
 
   @Override
-  BinaryDecoder decompress(byte[] data, int offset, int length)
-      throws IOException {
-    Inflater inflater = new Inflater(true);
+  ByteBuffer decompress(ByteBuffer data) throws IOException {
+    if (compressionBuffer == null) {
+      compressionBuffer = new NonCopyingByteArrayOutputStream(
+          data.remaining());
+    }
+    if (inflater == null) {
+      inflater = new Inflater(nowrap);
+    }
     InputStream uncompressed = new InflaterInputStream(
-        new ByteArrayInputStream(data, offset, length), inflater);
-    return DecoderFactory.defaultFactory().createBinaryDecoder(uncompressed, null);
+        new ByteArrayInputStream(data.array(),
+            data.position() + data.arrayOffset(),
+            data.remaining()), inflater);
+    int read;
+    byte[] buff = new byte[2048];
+    try {
+      while (true) {
+        read = uncompressed.read(buff);
+        if (read < 0) break;
+        compressionBuffer.write(buff, 0, read);
+      } 
+    } catch (EOFException e) {
+      // sometimes InflaterInputStream.read
+      // throws this instead of returning -1
+    }
+    ByteBuffer result = compressionBuffer.getByteArrayAsByteBuffer();
+    inflater.reset();
+    compressionBuffer.reset();
+    return result;
+  }
+
+  @Override
+  public int hashCode() {
+    return nowrap ? 0 : 1;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (getClass() != obj.getClass())
+      return false;
+    DeflateCodec other = (DeflateCodec)obj;
+    return (this.nowrap == other.nowrap);
   }
 
+  @Override
+  public String toString() {
+    return getName() + "-" + compressionLevel;
+  }
 }

Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java Fri Feb 19 23:16:57 2010
@@ -17,11 +17,8 @@
  */
 package org.apache.avro.file;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.BinaryDecoder;
+import java.nio.ByteBuffer;
 
 /** Implements "null" (pass through) codec. */
 final class NullCodec extends Codec {
@@ -29,13 +26,12 @@
   private static final NullCodec INSTANCE = new NullCodec();
 
   static class Option extends CodecFactory {
-
     @Override
     protected Codec createInstance() {
       return INSTANCE;
     }
-    
   }
+
   /** No options available for NullCodec. */
   public static final CodecFactory OPTION = new Option();
 
@@ -45,15 +41,24 @@
   }
 
   @Override
-  ByteArrayOutputStream compress(ByteArrayOutputStream buffer)
-      throws IOException {
+  ByteBuffer compress(ByteBuffer buffer) throws IOException {
     return buffer;
   }
-  
+
+  @Override
+  ByteBuffer decompress(ByteBuffer data) throws IOException {
+    return data;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other)
+      return true;
+    return (this.getClass() == other.getClass());
+  }
+
   @Override
-  BinaryDecoder decompress(byte[] data, int offset, int length)
-      throws IOException {
-    return DecoderFactory.defaultFactory().createBinaryDecoder(
-        data, offset, length, null);
+  public int hashCode() {
+    return 2;
   }
 }

Added: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java?rev=912030&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java (added)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java Fri Feb 19 23:16:57 2010
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class TestDataFileConcat {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestDataFileConcat.class);
+
+  CodecFactory codec = null;
+  CodecFactory codec2 = null;
+  boolean recompress;
+  public TestDataFileConcat(CodecFactory codec, CodecFactory codec2, Boolean recompress) {
+    this.codec = codec;
+    this.codec2 = codec2;
+    this.recompress = recompress;
+    LOG.info("Testing concatenating files, " + codec2 + " into " + codec + 
+        " with recompress=" + recompress);
+  }
+
+  @Parameters
+  public static List<Object[]> codecs() {
+    List<Object[]> r = new ArrayList<Object[]>();
+    r.add(new Object[] { null , null, false});
+    r.add(new Object[] { null , null, true});
+    r.add(new Object[]
+        { CodecFactory.deflateCodec(1), CodecFactory.deflateCodec(6), false });
+    r.add(new Object[]
+        { CodecFactory.deflateCodec(1), CodecFactory.deflateCodec(6), true });
+    r.add(new Object[]
+        { CodecFactory.deflateCodec(3), CodecFactory.nullCodec(), false });
+    r.add(new Object[]
+        { CodecFactory.nullCodec(), CodecFactory.deflateCodec(6), false });
+    return r;
+  }
+
+  private static final int COUNT =
+    Integer.parseInt(System.getProperty("test.count", "200"));
+  private static final boolean VALIDATE =
+    !"false".equals(System.getProperty("test.validate", "true"));
+  private static final File DIR
+    = new File(System.getProperty("test.dir", "/tmp"));
+  private static final long SEED = System.currentTimeMillis();
+
+  private static final String SCHEMA_JSON =
+    "{\"type\": \"record\", \"name\": \"Test\", \"fields\": ["
+    +"{\"name\":\"stringField\", \"type\":\"string\"},"
+    +"{\"name\":\"longField\", \"type\":\"long\"}]}";
+  private static final Schema SCHEMA = Schema.parse(SCHEMA_JSON);
+
+  private File makeFile(String name) {
+    return new File(DIR, "test-" + name + ".avro");
+  }
+
+  @Test
+  public void testConcateateFiles() throws IOException {
+    File file1 = makeFile((codec == null ? "null" : codec.toString()) + "-A");
+    File file2 = makeFile((codec2 == null ? "null" : codec2.toString()) + "-B");
+    DataFileWriter<Object> writer =
+      new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+      .setSyncInterval(500);
+    if (codec != null) {
+      writer.setCodec(codec);
+    }
+    writer.create(SCHEMA, file1);
+    try {
+      for (Object datum : new RandomData(SCHEMA, COUNT, SEED)) {
+        writer.append(datum);
+      }
+    } finally {
+      writer.close();
+    }
+    DataFileWriter<Object> writer2 =
+      new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+      .setSyncInterval(500);
+    if (codec2 != null) {
+      writer2.setCodec(codec2);
+    }
+    writer2.create(SCHEMA, file2);
+    try {
+      for (Object datum : new RandomData(SCHEMA, COUNT, SEED+1)) {
+        writer2.append(datum);
+      }
+    } finally {
+      writer2.close();
+    }
+    DataFileWriter<Object> concatinto = 
+      new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+      .setSyncInterval(500);
+    concatinto.appendTo(file1);
+    DataFileReader<Object> concatfrom =
+      new DataFileReader<Object>(file2, new GenericDatumReader<Object>());
+    concatinto.appendAllFrom(concatfrom, recompress);
+    concatinto.close();
+    
+    DataFileReader<Object> concat =
+      new DataFileReader<Object>(file1, new GenericDatumReader<Object>());
+   
+    try {
+      Object datum = null;
+      if (VALIDATE) {
+        for (Object expected : new RandomData(SCHEMA, COUNT, SEED)) {
+          datum = concat.next(datum);
+          assertEquals(expected, datum);
+        }
+        for (Object expected : new RandomData(SCHEMA, COUNT, SEED+1)) {
+          datum = concat.next(datum);
+          assertEquals(expected, datum);
+        }
+      } else {
+        for (int i = 0; i < COUNT*2; i++) {
+          datum = concat.next(datum);
+        }
+      }
+    } finally {
+      concat.close();
+    }
+
+  }
+}