You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/02/20 00:16:58 UTC
svn commit: r912030 - in /hadoop/avro/trunk: ./
lang/java/src/java/org/apache/avro/file/
lang/java/src/test/java/org/apache/avro/
Author: cutting
Date: Fri Feb 19 23:16:57 2010
New Revision: 912030
URL: http://svn.apache.org/viewvc?rev=912030&view=rev
Log:
AVRO-414. Add Java support for concatenating and appending data files. Contributed by Scott Carey.
Added:
hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Fri Feb 19 23:16:57 2010
@@ -95,6 +95,9 @@
AVRO-136. Add support for building/releasing python eggs (hammer)
+ AVRO-414. Add Java support for concatenating and appending data
+ files. (Scott Carey via cutting)
+
IMPROVEMENTS
AVRO-157. Changes from code review comments for C++. (sbanacho)
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/Codec.java Fri Feb 19 23:16:57 2010
@@ -17,10 +17,8 @@
*/
package org.apache.avro.file;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-
-import org.apache.avro.io.BinaryDecoder;
+import java.nio.ByteBuffer;
/**
* Interface for Avro-supported compression codecs for data files.
@@ -30,8 +28,23 @@
abstract class Codec {
/** Name of the codec; written to the file's metadata. */
abstract String getName();
- /** Compresses the input data and return the result as a ByteArrayOutputStream */
- abstract ByteArrayOutputStream compress(ByteArrayOutputStream data) throws IOException;
- /** Returns a decoder on the uncompressed data. */
- abstract BinaryDecoder decompress(byte[] compressedData, int offset, int length) throws IOException;
+ /** Compresses the input data */
+ abstract ByteBuffer compress(ByteBuffer uncompressedData) throws IOException;
+ /** Decompress the data */
+ abstract ByteBuffer decompress(ByteBuffer compressedData) throws IOException;
+ /**
+ * Codecs must implement an equals() method. Two codecs, A and B are equal
+ * if: the result of A and B decompressing content compressed by A is the same
+ * AND the retult of A and B decompressing content compressed by B is the same
+ **/
+ @Override
+ public abstract boolean equals(Object other);
+ /**
+ * Codecs must implement a hashCode() method that is consistent with equals().*/
+ @Override
+ public abstract int hashCode();
+ @Override
+ public String toString() {
+ return getName();
+ }
}
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/CodecFactory.java Fri Feb 19 23:16:57 2010
@@ -67,4 +67,10 @@
return REGISTERED.put(name, c);
}
+ @Override
+ public String toString() {
+ Codec instance = this.createInstance();
+ return instance.toString();
+ }
+
}
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileReader.java Fri Feb 19 23:16:57 2010
@@ -80,20 +80,22 @@
syncBuffer[i++%SYNC_SIZE] = (byte)b;
} while (b != -1);
} catch (EOFException e) {
+ // fall through
+ }
+ // if no match or EOF set start to the end position
blockStart = sin.tell();
+ //System.out.println("block start location after EOF: " + blockStart );
return;
}
- }
@Override
- void skipSync() throws IOException { // note block start
- super.skipSync();
+ protected void blockFinished() throws IOException {
blockStart = sin.tell() - vin.inputStream().available();
}
/** Return true if past the next synchronization point after a position. */
public boolean pastSync(long position) throws IOException {
- return blockStart >= Math.min(sin.length(), position+SYNC_SIZE);
+ return ((blockStart >= position+SYNC_SIZE)||(blockStart >= sin.length()));
}
private static class SeekableInputStream extends InputStream
@@ -161,7 +163,7 @@
long remaining = (in.length() - in.tell());
return (remaining > Integer.MAX_VALUE) ? Integer.MAX_VALUE
: (int) remaining;
-}
+ }
}
}
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileStream.java Fri Feb 19 23:16:57 2010
@@ -42,6 +42,8 @@
private Schema schema;
private DatumReader<D> reader;
+ private long blockSize;
+ private boolean availableBlock = false;
/** Decoder on raw input stream. (Used for metadata.) */
BinaryDecoder vin;
@@ -104,7 +106,7 @@
reader.setSchema(schema);
}
- private Codec resolveCodec() {
+ Codec resolveCodec() {
String codecStr = getMetaString(DataFileConstants.CODEC);
if (codecStr != null) {
return CodecFactory.fromString(codecStr).createInstance();
@@ -142,7 +144,7 @@
* pointer into the file. */
public Iterator<D> iterator() { return this; }
- private byte[] block = null;
+ private DataBlock block = null;
/** True if more entries remain in this file. */
public boolean hasNext() {
try {
@@ -154,19 +156,14 @@
throw new IOException("Block read partially, the data may be corrupt");
}
}
- blockRemaining = vin.readLong(); // read block count
- long compressedSize = vin.readLong(); // read block size
- if (compressedSize > Integer.MAX_VALUE ||
- compressedSize < 0) {
- throw new IOException("Block size invalid or too large for this " +
- "implementation: " + compressedSize);
+ if (hasNextBlock()) {
+ block = nextBlock(block);
+ ByteBuffer blockBuffer = ByteBuffer.wrap(block.data, 0, block.blockSize);
+ blockBuffer = codec.decompress(blockBuffer);
+ datumIn = DecoderFactory.defaultFactory().createBinaryDecoder(
+ blockBuffer.array(), blockBuffer.arrayOffset() +
+ blockBuffer.position(), blockBuffer.remaining(), datumIn);
}
- if (block == null || block.length < (int) compressedSize) {
- block = new byte[(int) compressedSize];
- }
- // throws if it can't read the size requested
- vin.readFixed(block, 0, (int)compressedSize);
- datumIn = codec.decompress(block, 0, (int) compressedSize);
}
return blockRemaining != 0;
} catch (EOFException e) { // at EOF
@@ -195,11 +192,53 @@
if (!hasNext())
throw new NoSuchElementException();
D result = reader.read(reuse, datumIn);
- if (--blockRemaining == 0)
- skipSync();
+ if (0 == --blockRemaining) {
+ blockFinished();
+ }
return result;
}
+ protected void blockFinished() throws IOException {
+ // nothing for the stream impl
+ }
+
+ boolean hasNextBlock() {
+ try {
+ if (availableBlock) return true;
+ if (vin.isEnd()) return false;
+ blockRemaining = vin.readLong(); // read block count
+ blockSize = vin.readLong(); // read block size
+ if (blockSize > Integer.MAX_VALUE ||
+ blockSize < 0) {
+ throw new IOException("Block size invalid or too large for this " +
+ "implementation: " + blockSize);
+ }
+ availableBlock = true;
+ return true;
+ } catch (EOFException eof) {
+ return false;
+ } catch (IOException e) {
+ throw new AvroRuntimeException(e);
+ }
+ }
+
+ DataBlock nextBlock(DataBlock reuse) throws IOException {
+ if (!hasNextBlock()) {
+ throw new NoSuchElementException();
+ }
+ if (reuse == null || reuse.data.length < (int) blockSize) {
+ reuse = new DataBlock(blockRemaining, (int) blockSize);
+ } else {
+ reuse.numEntries = blockRemaining;
+ reuse.blockSize = (int)blockSize;
+ }
+ // throws if it can't read the size requested
+ vin.readFixed(reuse.data, 0, reuse.blockSize);
+ skipSync();
+ availableBlock = false;
+ return reuse;
+ }
+
void skipSync() throws IOException {
vin.readFixed(syncBuffer);
if (!Arrays.equals(syncBuffer, sync))
@@ -214,5 +253,16 @@
vin.inputStream().close();
}
+ static class DataBlock {
+ byte[] data;
+ long numEntries;
+ int blockSize;
+ DataBlock(long numEntries, int blockSize) {
+ this.data = new byte[blockSize];
+ this.numEntries = numEntries;
+ this.blockSize = blockSize;
+ }
+ }
+
}
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DataFileWriter.java Fri Feb 19 23:16:57 2010
@@ -30,6 +30,7 @@
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.rmi.server.UID;
import java.security.MessageDigest;
@@ -39,6 +40,7 @@
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream.DataBlock;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
@@ -63,7 +65,7 @@
private long blockCount; // # entries in current block
- private ByteArrayOutputStream buffer;
+ private NonCopyingByteArrayOutputStream buffer;
private Encoder bufOut;
private byte[] sync; // 16 random bytes
@@ -181,12 +183,12 @@
this.out = new BufferedFileOutputStream(outs);
this.vout = new BinaryEncoder(out);
dout.setSchema(schema);
- this.buffer = new ByteArrayOutputStream(
+ buffer = new NonCopyingByteArrayOutputStream(
Math.min((int)(syncInterval * 1.25), Integer.MAX_VALUE/2 -1));
+ this.bufOut = new BinaryEncoder(buffer);
if (this.codec == null) {
this.codec = CodecFactory.nullCodec().createInstance();
}
- this.bufOut = new BinaryEncoder(buffer);
this.isOpen = true;
}
@@ -249,16 +251,72 @@
writeBlock();
}
+ /**
+ * Appends data from another file. otherFile must have the same schema.
+ * Data blocks will be copied without de-serializing data. If the codecs
+ * of the two files are compatible, data blocks are copied directly without
+ * decompression. If the codecs are not compatible, blocks from otherFile
+ * are uncompressed and then compressed using this file's codec.
+ * <p/>
+ * If the recompress flag is set all blocks are decompressed and then compressed
+ * using this file's codec. This is useful when the two files have compatible
+ * compression codecs but different codec options. For example, one might
+ * append a file compressed with deflate at compression level 1 to a file with
+ * deflate at compression level 7. If <i>recompress</i> is false, blocks
+ * will be copied without changing the compression level. If true, they will
+ * be converted to the new compression level.
+ * @param otherFile
+ * @param recompress
+ * @throws IOException
+ */
+ public void appendAllFrom(DataFileStream<D> otherFile, boolean recompress) throws IOException {
+ assertOpen();
+ // make sure other file has same schema
+ Schema otherSchema = otherFile.getSchema();
+ if (!this.schema.equals(otherSchema)) {
+ throw new IOException("Schema from file " + otherFile + " does not match");
+ }
+ // flush anything written so far
+ writeBlock();
+ Codec otherCodec = otherFile.resolveCodec();
+ DataBlock nextBlockRaw = null;
+ if (codec.equals(otherCodec) && !recompress) {
+ // copy raw bytes
+ while(otherFile.hasNextBlock()) {
+ nextBlockRaw = otherFile.nextBlock(nextBlockRaw);
+ writeRawBlock(nextBlockRaw);
+ }
+ } else {
+ while(otherFile.hasNextBlock()) {
+ nextBlockRaw = otherFile.nextBlock(nextBlockRaw);
+ ByteBuffer uncompressedData = otherCodec.decompress(ByteBuffer.wrap(
+ nextBlockRaw.data, 0, nextBlockRaw.blockSize));
+ ByteBuffer compressed = codec.compress(uncompressedData);
+ nextBlockRaw.data = compressed.array();
+ nextBlockRaw.blockSize = compressed.remaining();
+ writeRawBlock(nextBlockRaw);
+ }
+ }
+ }
+
+ private void writeRawBlock(DataBlock rawBlock) throws IOException {
+ vout.writeLong(rawBlock.numEntries);
+ vout.writeLong(rawBlock.blockSize);
+ vout.writeFixed(rawBlock.data, 0, rawBlock.blockSize);
+ vout.writeFixed(sync);
+ }
+
private void writeBlock() throws IOException {
if (blockCount > 0) {
vout.writeLong(blockCount);
- ByteArrayOutputStream block = codec.compress(buffer);
- vout.writeLong(block.size());
- vout.flush(); //vout may be buffered, flush before writing to out
- block.writeTo(out);
+ ByteBuffer uncompressed = buffer.getByteArrayAsByteBuffer();
+ ByteBuffer block = codec.compress(uncompressed);
+ vout.writeLong(block.remaining());
+ vout.writeFixed(block.array(), block.position() + block.arrayOffset(),
+ block.remaining());
buffer.reset();
blockCount = 0;
- out.write(sync);
+ vout.writeFixed(sync);
}
}
@@ -274,6 +332,7 @@
/** Flush the current state of the file. */
public void flush() throws IOException {
sync();
+ vout.flush();
out.flush();
}
@@ -303,4 +362,13 @@
public long tell() { return position+count; }
}
+ static class NonCopyingByteArrayOutputStream extends ByteArrayOutputStream {
+ NonCopyingByteArrayOutputStream(int initialSize) {
+ super(initialSize);
+ }
+ ByteBuffer getByteArrayAsByteBuffer() {
+ return ByteBuffer.wrap(buf, 0, count);
+ }
+ }
+
}
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/DeflateCodec.java Fri Feb 19 23:16:57 2010
@@ -18,16 +18,16 @@
package org.apache.avro.file;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.file.DataFileWriter.NonCopyingByteArrayOutputStream;
/**
* Implements DEFLATE (RFC1951) compression and decompression.
@@ -54,8 +54,11 @@
}
}
- ByteArrayOutputStream compressionBuffer;
+ NonCopyingByteArrayOutputStream compressionBuffer;
private Deflater deflater;
+ private Inflater inflater;
+ //currently only do 'nowrap' -- RFC 1951, not zlib
+ private boolean nowrap = true;
private int compressionLevel;
public DeflateCodec(int compressionLevel) {
@@ -68,32 +71,75 @@
}
@Override
- ByteArrayOutputStream compress(ByteArrayOutputStream buffer)
- throws IOException {
+ ByteBuffer compress(ByteBuffer data) throws IOException {
if (compressionBuffer == null) {
- compressionBuffer = new ByteArrayOutputStream(buffer.size());
- } else {
- compressionBuffer.reset();
+ compressionBuffer = new NonCopyingByteArrayOutputStream(
+ data.remaining());
}
if (deflater == null) {
- deflater = new Deflater(compressionLevel, true);
+ deflater = new Deflater(compressionLevel, nowrap);
}
// Pass output through deflate
DeflaterOutputStream deflaterStream =
new DeflaterOutputStream(compressionBuffer, deflater);
- buffer.writeTo(deflaterStream);
+ deflaterStream.write(data.array(),
+ data.position() + data.arrayOffset(),
+ data.limit() + data.arrayOffset());
deflaterStream.finish();
+ ByteBuffer result = compressionBuffer.getByteArrayAsByteBuffer();
deflater.reset();
- return compressionBuffer;
+ compressionBuffer.reset();
+ return result;
}
@Override
- BinaryDecoder decompress(byte[] data, int offset, int length)
- throws IOException {
- Inflater inflater = new Inflater(true);
+ ByteBuffer decompress(ByteBuffer data) throws IOException {
+ if (compressionBuffer == null) {
+ compressionBuffer = new NonCopyingByteArrayOutputStream(
+ data.remaining());
+ }
+ if (inflater == null) {
+ inflater = new Inflater(nowrap);
+ }
InputStream uncompressed = new InflaterInputStream(
- new ByteArrayInputStream(data, offset, length), inflater);
- return DecoderFactory.defaultFactory().createBinaryDecoder(uncompressed, null);
+ new ByteArrayInputStream(data.array(),
+ data.position() + data.arrayOffset(),
+ data.remaining()), inflater);
+ int read;
+ byte[] buff = new byte[2048];
+ try {
+ while (true) {
+ read = uncompressed.read(buff);
+ if (read < 0) break;
+ compressionBuffer.write(buff, 0, read);
+ }
+ } catch (EOFException e) {
+ // sometimes InflaterInputStream.read
+ // throws this instead of returning -1
+ }
+ ByteBuffer result = compressionBuffer.getByteArrayAsByteBuffer();
+ inflater.reset();
+ compressionBuffer.reset();
+ return result;
+ }
+
+ @Override
+ public int hashCode() {
+ return nowrap ? 0 : 1;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (getClass() != obj.getClass())
+ return false;
+ DeflateCodec other = (DeflateCodec)obj;
+ return (this.nowrap == other.nowrap);
}
+ @Override
+ public String toString() {
+ return getName() + "-" + compressionLevel;
+ }
}
Modified: hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java?rev=912030&r1=912029&r2=912030&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java (original)
+++ hadoop/avro/trunk/lang/java/src/java/org/apache/avro/file/NullCodec.java Fri Feb 19 23:16:57 2010
@@ -17,11 +17,8 @@
*/
package org.apache.avro.file;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.BinaryDecoder;
+import java.nio.ByteBuffer;
/** Implements "null" (pass through) codec. */
final class NullCodec extends Codec {
@@ -29,13 +26,12 @@
private static final NullCodec INSTANCE = new NullCodec();
static class Option extends CodecFactory {
-
@Override
protected Codec createInstance() {
return INSTANCE;
}
-
}
+
/** No options available for NullCodec. */
public static final CodecFactory OPTION = new Option();
@@ -45,15 +41,24 @@
}
@Override
- ByteArrayOutputStream compress(ByteArrayOutputStream buffer)
- throws IOException {
+ ByteBuffer compress(ByteBuffer buffer) throws IOException {
return buffer;
}
-
+
+ @Override
+ ByteBuffer decompress(ByteBuffer data) throws IOException {
+ return data;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other)
+ return true;
+ return (this.getClass() == other.getClass());
+ }
+
@Override
- BinaryDecoder decompress(byte[] data, int offset, int length)
- throws IOException {
- return DecoderFactory.defaultFactory().createBinaryDecoder(
- data, offset, length, null);
+ public int hashCode() {
+ return 2;
}
}
Added: hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java?rev=912030&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java (added)
+++ hadoop/avro/trunk/lang/java/src/test/java/org/apache/avro/TestDataFileConcat.java Fri Feb 19 23:16:57 2010
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class TestDataFileConcat {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestDataFileConcat.class);
+
+ CodecFactory codec = null;
+ CodecFactory codec2 = null;
+ boolean recompress;
+ public TestDataFileConcat(CodecFactory codec, CodecFactory codec2, Boolean recompress) {
+ this.codec = codec;
+ this.codec2 = codec2;
+ this.recompress = recompress;
+ LOG.info("Testing concatenating files, " + codec2 + " into " + codec +
+ " with recompress=" + recompress);
+ }
+
+ @Parameters
+ public static List<Object[]> codecs() {
+ List<Object[]> r = new ArrayList<Object[]>();
+ r.add(new Object[] { null , null, false});
+ r.add(new Object[] { null , null, true});
+ r.add(new Object[]
+ { CodecFactory.deflateCodec(1), CodecFactory.deflateCodec(6), false });
+ r.add(new Object[]
+ { CodecFactory.deflateCodec(1), CodecFactory.deflateCodec(6), true });
+ r.add(new Object[]
+ { CodecFactory.deflateCodec(3), CodecFactory.nullCodec(), false });
+ r.add(new Object[]
+ { CodecFactory.nullCodec(), CodecFactory.deflateCodec(6), false });
+ return r;
+ }
+
+ private static final int COUNT =
+ Integer.parseInt(System.getProperty("test.count", "200"));
+ private static final boolean VALIDATE =
+ !"false".equals(System.getProperty("test.validate", "true"));
+ private static final File DIR
+ = new File(System.getProperty("test.dir", "/tmp"));
+ private static final long SEED = System.currentTimeMillis();
+
+ private static final String SCHEMA_JSON =
+ "{\"type\": \"record\", \"name\": \"Test\", \"fields\": ["
+ +"{\"name\":\"stringField\", \"type\":\"string\"},"
+ +"{\"name\":\"longField\", \"type\":\"long\"}]}";
+ private static final Schema SCHEMA = Schema.parse(SCHEMA_JSON);
+
+ private File makeFile(String name) {
+ return new File(DIR, "test-" + name + ".avro");
+ }
+
+ @Test
+ public void testConcateateFiles() throws IOException {
+ File file1 = makeFile((codec == null ? "null" : codec.toString()) + "-A");
+ File file2 = makeFile((codec2 == null ? "null" : codec2.toString()) + "-B");
+ DataFileWriter<Object> writer =
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+ .setSyncInterval(500);
+ if (codec != null) {
+ writer.setCodec(codec);
+ }
+ writer.create(SCHEMA, file1);
+ try {
+ for (Object datum : new RandomData(SCHEMA, COUNT, SEED)) {
+ writer.append(datum);
+ }
+ } finally {
+ writer.close();
+ }
+ DataFileWriter<Object> writer2 =
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+ .setSyncInterval(500);
+ if (codec2 != null) {
+ writer2.setCodec(codec2);
+ }
+ writer2.create(SCHEMA, file2);
+ try {
+ for (Object datum : new RandomData(SCHEMA, COUNT, SEED+1)) {
+ writer2.append(datum);
+ }
+ } finally {
+ writer2.close();
+ }
+ DataFileWriter<Object> concatinto =
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+ .setSyncInterval(500);
+ concatinto.appendTo(file1);
+ DataFileReader<Object> concatfrom =
+ new DataFileReader<Object>(file2, new GenericDatumReader<Object>());
+ concatinto.appendAllFrom(concatfrom, recompress);
+ concatinto.close();
+
+ DataFileReader<Object> concat =
+ new DataFileReader<Object>(file1, new GenericDatumReader<Object>());
+
+ try {
+ Object datum = null;
+ if (VALIDATE) {
+ for (Object expected : new RandomData(SCHEMA, COUNT, SEED)) {
+ datum = concat.next(datum);
+ assertEquals(expected, datum);
+ }
+ for (Object expected : new RandomData(SCHEMA, COUNT, SEED+1)) {
+ datum = concat.next(datum);
+ assertEquals(expected, datum);
+ }
+ } else {
+ for (int i = 0; i < COUNT*2; i++) {
+ datum = concat.next(datum);
+ }
+ }
+ } finally {
+ concat.close();
+ }
+
+ }
+}