You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/26 23:11:43 UTC
svn commit: r500378 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/io/SequenceFile.java
src/test/org/apache/hadoop/io/TestSequenceFile.java
Author: cutting
Date: Fri Jan 26 14:11:42 2007
New Revision: 500378
URL: http://svn.apache.org/viewvc?view=rev&rev=500378
Log:
HADOOP-732. Add support to SequenceFile for arbitrary metadata, as a set of attribute value pairs. Contributed by Runping.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=500378&r1=500377&r2=500378
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 26 14:11:42 2007
@@ -67,6 +67,9 @@
in HDFS, try another replica of the data, if any.
(Wendy Chien via cutting)
+21. HADOOP-732. Add support to SequenceFile for arbitrary metadata,
+ as a set of attribute value pairs. (Runping Qi via cutting)
+
Release 0.10.1 - 2007-01-10
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=500378&r1=500377&r2=500378
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Fri Jan 26 14:11:42 2007
@@ -50,8 +50,9 @@
private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
+ private static final byte VERSION_WITH_METADATA = (byte)6;
private static byte[] VERSION = new byte[] {
- (byte)'S', (byte)'E', (byte)'Q', CUSTOM_COMPRESS_VERSION
+ (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
};
private static final int SYNC_ESCAPE = -1; // "length" of sync entries
@@ -93,7 +94,7 @@
CompressionType val) {
job.set("io.seqfile.compression.type", val.toString());
}
-
+
/**
* Construct the preferred type of SequenceFile Writer.
* @param fs The configured filesystem.
@@ -130,7 +131,7 @@
Writer writer = null;
if (compressionType == CompressionType.NONE) {
- writer = new Writer(fs, conf, name, keyClass, valClass);
+ writer = new Writer(fs, conf, name, keyClass, valClass, null, new Metadata());
} else if (compressionType == CompressionType.RECORD) {
writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass,
new DefaultCodec());
@@ -161,13 +162,13 @@
Writer writer = null;
if (compressionType == CompressionType.NONE) {
- writer = new Writer(fs, conf, name, keyClass, valClass, progress);
+ writer = new Writer(fs, conf, name, keyClass, valClass, progress, new Metadata());
} else if (compressionType == CompressionType.RECORD) {
writer = new RecordCompressWriter(fs, conf, name,
- keyClass, valClass, new DefaultCodec(), progress);
+ keyClass, valClass, new DefaultCodec(), progress, new Metadata());
} else if (compressionType == CompressionType.BLOCK){
writer = new BlockCompressWriter(fs, conf, name,
- keyClass, valClass, new DefaultCodec(), progress);
+ keyClass, valClass, new DefaultCodec(), progress, new Metadata());
}
return writer;
@@ -222,6 +223,7 @@
* @param compressionType The compression type.
* @param codec The compression codec.
* @param progress The Progressable object to track progress.
+ * @param metadata The metadata of the file.
* @return Returns the handle to the constructed SequenceFile Writer.
* @throws IOException
*/
@@ -229,7 +231,7 @@
createWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass,
CompressionType compressionType, CompressionCodec codec,
- Progressable progress) throws IOException {
+ Progressable progress, Metadata metadata) throws IOException {
if ((codec instanceof GzipCodec) &&
!NativeCodeLoader.isNativeCodeLoaded() &&
!ZlibFactory.isNativeZlibLoaded()) {
@@ -240,17 +242,40 @@
Writer writer = null;
if (compressionType == CompressionType.NONE) {
- writer = new Writer(fs, conf, name, keyClass, valClass, progress);
+ writer = new Writer(fs, conf, name, keyClass, valClass, progress, metadata);
} else if (compressionType == CompressionType.RECORD) {
writer = new RecordCompressWriter(fs, conf, name,
- keyClass, valClass, codec, progress);
+ keyClass, valClass, codec, progress, metadata);
} else if (compressionType == CompressionType.BLOCK){
writer = new BlockCompressWriter(fs, conf, name,
- keyClass, valClass, codec, progress);
+ keyClass, valClass, codec, progress, metadata);
}
return writer;
}
+
+ /**
+ * Construct the preferred type of SequenceFile Writer.
+ * @param fs The configured filesystem.
+ * @param conf The configuration.
+ * @param name The name of the file.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compressionType The compression type.
+ * @param codec The compression codec.
+ * @param progress The Progressable object to track progress.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass,
+ CompressionType compressionType, CompressionCodec codec,
+ Progressable progress) throws IOException {
+ Writer writer = createWriter(fs, conf, name, keyClass, valClass,
+ compressionType, codec, progress, new Metadata());
+ return writer;
+ }
/**
* Construct the preferred type of 'raw' SequenceFile Writer.
@@ -259,13 +284,14 @@
* @param valClass The 'value' type.
* @param compress Compress data?
* @param blockCompress Compress blocks?
+ * @param metadata The metadata of the file.
* @return Returns the handle to the constructed SequenceFile Writer.
* @throws IOException
*/
private static Writer
createWriter(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass, boolean compress, boolean blockCompress,
- CompressionCodec codec)
+ CompressionCodec codec, Metadata metadata)
throws IOException {
if ((codec instanceof GzipCodec) &&
!NativeCodeLoader.isNativeCodeLoaded() &&
@@ -277,11 +303,11 @@
Writer writer = null;
if (!compress) {
- writer = new Writer(conf, out, keyClass, valClass);
+ writer = new Writer(conf, out, keyClass, valClass, metadata);
} else if (compress && !blockCompress) {
- writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec);
+ writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
} else {
- writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec);
+ writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
}
return writer;
@@ -289,19 +315,41 @@
/**
* Construct the preferred type of 'raw' SequenceFile Writer.
+ * @param out The stream on top which the writer is to be constructed.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compress Compress data?
+ * @param blockCompress Compress blocks?
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ private static Writer
+ createWriter(Configuration conf, FSDataOutputStream out,
+ Class keyClass, Class valClass, boolean compress, boolean blockCompress,
+ CompressionCodec codec)
+ throws IOException {
+ Writer writer = createWriter(conf, out, keyClass, valClass, compress,
+ blockCompress, codec, new Metadata());
+ return writer;
+ }
+
+
+ /**
+ * Construct the preferred type of 'raw' SequenceFile Writer.
* @param conf The configuration.
* @param out The stream on top which the writer is to be constructed.
* @param keyClass The 'key' type.
* @param valClass The 'value' type.
* @param compressionType The compression type.
* @param codec The compression codec.
+ * @param metadata The metadata of the file.
* @return Returns the handle to the constructed SequenceFile Writer.
* @throws IOException
*/
public static Writer
createWriter(Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass, CompressionType compressionType,
- CompressionCodec codec)
+ CompressionCodec codec, Metadata metadata)
throws IOException {
if ((codec instanceof GzipCodec) &&
!NativeCodeLoader.isNativeCodeLoaded() &&
@@ -313,15 +361,37 @@
Writer writer = null;
if (compressionType == CompressionType.NONE) {
- writer = new Writer(conf, out, keyClass, valClass);
+ writer = new Writer(conf, out, keyClass, valClass, metadata);
} else if (compressionType == CompressionType.RECORD) {
- writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec);
+ writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
} else if (compressionType == CompressionType.BLOCK){
- writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec);
+ writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
}
return writer;
}
+
+ /**
+ * Construct the preferred type of 'raw' SequenceFile Writer.
+ * @param conf The configuration.
+ * @param out The stream on top which the writer is to be constructed.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compressionType The compression type.
+ * @param codec The compression codec.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(Configuration conf, FSDataOutputStream out,
+ Class keyClass, Class valClass, CompressionType compressionType,
+ CompressionCodec codec)
+ throws IOException {
+ Writer writer = createWriter(conf, out, keyClass, valClass, compressionType,
+ codec, new Metadata());
+ return writer;
+ }
+
/** The interface to 'raw' values of SequenceFiles. */
public static interface ValueBytes {
@@ -424,6 +494,99 @@
} // CompressedBytes
+ /**
+ * The class encapsulating with the metadata of a file.
+ * The metadata of a file is a list of attribute name/value
+ * pairs of Text type.
+ *
+ */
+ static class Metadata implements Writable {
+
+ private TreeMap<Text, Text> theMetadata;
+
+ public Metadata() {
+ this(new TreeMap<Text, Text>());
+ }
+
+ public Metadata(TreeMap<Text, Text> arg) {
+ if (arg == null) {
+ this.theMetadata = new TreeMap<Text, Text>();
+ } else {
+ this.theMetadata = arg;
+ }
+ }
+
+ public Text get(Text name) {
+ return this.theMetadata.get(name);
+ }
+
+ public void set(Text name, Text value) {
+ this.theMetadata.put(name, value);
+ }
+
+ public TreeMap<Text, Text> getMetadata() {
+ return new TreeMap<Text, Text>(this.theMetadata);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(this.theMetadata.size());
+ Iterator iter = this.theMetadata.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<Text, Text> en = (Map.Entry<Text, Text>)iter.next();
+ en.getKey().write(out);
+ en.getValue().write(out);
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ int sz = in.readInt();
+ if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
+ this.theMetadata = new TreeMap<Text, Text>();
+ for (int i = 0; i < sz; i++) {
+ Text key = new Text();
+ Text val = new Text();
+ key.readFields(in);
+ val.readFields(in);
+ this.theMetadata.put(key, val);
+ }
+ }
+
+ public boolean equals(Metadata other) {
+ if (other == null) return false;
+ if (this.theMetadata.size() != other.theMetadata.size()) {
+ return false;
+ }
+ Iterator iter1 = this.theMetadata.entrySet().iterator();
+ Iterator iter2 = other.theMetadata.entrySet().iterator();
+ while (iter1.hasNext() && iter2.hasNext()) {
+ Map.Entry<Text, Text> en1 = (Map.Entry<Text, Text>)iter1.next();
+ Map.Entry<Text, Text> en2 = (Map.Entry<Text, Text>)iter2.next();
+ if (!en1.getKey().equals(en2.getKey())) {
+ return false;
+ }
+ if (!en1.getValue().equals(en2.getValue())) {
+ return false;
+ }
+ }
+ if (iter1.hasNext() || iter2.hasNext()) {
+ return false;
+ }
+ return true;
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("size: ").append(this.theMetadata.size()).append("\n");
+ Iterator iter = this.theMetadata.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<Text, Text> en = (Map.Entry<Text, Text>)iter.next();
+ sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+ }
+
/** Write key/value pairs to a sequence-format file. */
public static class Writer {
Configuration conf;
@@ -438,6 +601,7 @@
CompressionCodec codec = null;
CompressionOutputStream deflateFilter = null;
DataOutputStream deflateOut = null;
+ Metadata metadata = null;
// Insert a globally unique 16-byte value every few entries, so that one
// can seek into the middle of a file and then synchronize with record
@@ -462,24 +626,24 @@
public Writer(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass)
throws IOException {
- this(fs, conf, name, keyClass, valClass, null);
+ this(fs, conf, name, keyClass, valClass, null, new Metadata());
}
/** Create the named file with write-progress reporter. */
public Writer(FileSystem fs, Configuration conf, Path name,
- Class keyClass, Class valClass, Progressable progress)
+ Class keyClass, Class valClass, Progressable progress, Metadata metadata)
throws IOException {
- init(name, conf, fs.create(name, progress), keyClass, valClass, false, null);
+ init(name, conf, fs.create(name, progress), keyClass, valClass, false, null, metadata);
initializeFileHeader();
writeFileHeader();
finalizeFileHeader();
}
-
+
/** Write to an arbitrary stream using a specified buffer size. */
private Writer(Configuration conf, FSDataOutputStream out,
- Class keyClass, Class valClass)
+ Class keyClass, Class valClass, Metadata metadata)
throws IOException {
- init(null, conf, out, keyClass, valClass, false, null);
+ init(null, conf, out, keyClass, valClass, false, null, metadata);
initializeFileHeader();
writeFileHeader();
@@ -514,12 +678,13 @@
if(this.isCompressed()) {
Text.writeString(out, (codec.getClass()).getName());
}
+ this.metadata.write(out);
}
-
+
/** Initialize. */
void init(Path name, Configuration conf, FSDataOutputStream out,
Class keyClass, Class valClass,
- boolean compress, CompressionCodec codec)
+ boolean compress, CompressionCodec codec, Metadata metadata)
throws IOException {
this.target = name;
this.conf = conf;
@@ -528,6 +693,7 @@
this.valClass = valClass;
this.compress = compress;
this.codec = codec;
+ this.metadata = metadata;
if(this.codec != null) {
ReflectionUtils.setConf(this.codec, this.conf);
this.deflateFilter = this.codec.createOutputStream(buffer);
@@ -644,7 +810,7 @@
public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionCodec codec)
throws IOException {
- super.init(name, conf, fs.create(name), keyClass, valClass, true, codec);
+ super.init(name, conf, fs.create(name), keyClass, valClass, true, codec, new Metadata());
initializeFileHeader();
writeFileHeader();
@@ -654,28 +820,36 @@
/** Create the named file with write-progress reporter. */
public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionCodec codec,
- Progressable progress)
+ Progressable progress, Metadata metadata)
throws IOException {
super.init(name, conf, fs.create(name, progress),
- keyClass, valClass, true, codec);
+ keyClass, valClass, true, codec, metadata);
initializeFileHeader();
writeFileHeader();
finalizeFileHeader();
}
+ /** Create the named file with write-progress reporter. */
+ public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass, CompressionCodec codec,
+ Progressable progress)
+ throws IOException {
+ this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
+ }
+
/** Write to an arbitrary stream using a specified buffer size. */
private RecordCompressWriter(Configuration conf, FSDataOutputStream out,
- Class keyClass, Class valClass, CompressionCodec codec)
+ Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
throws IOException {
- super.init(null, conf, out, keyClass, valClass, true, codec);
+ super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
initializeFileHeader();
writeFileHeader();
finalizeFileHeader();
}
-
+
boolean isCompressed() { return true; }
boolean isBlockCompressed() { return false; }
@@ -752,7 +926,7 @@
public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionCodec codec)
throws IOException {
- super.init(name, conf, fs.create(name), keyClass, valClass, true, codec);
+ super.init(name, conf, fs.create(name), keyClass, valClass, true, codec, new Metadata());
init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
initializeFileHeader();
@@ -763,10 +937,10 @@
/** Create the named file with write-progress reporter. */
public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass, CompressionCodec codec,
- Progressable progress)
+ Progressable progress, Metadata metadata)
throws IOException {
super.init(name, conf, fs.create(name, progress), keyClass, valClass,
- true, codec);
+ true, codec, metadata);
init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
initializeFileHeader();
@@ -774,18 +948,26 @@
finalizeFileHeader();
}
+ /** Create the named file with write-progress reporter. */
+ public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass, CompressionCodec codec,
+ Progressable progress)
+ throws IOException {
+ this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
+ }
+
/** Write to an arbitrary stream using a specified buffer size. */
private BlockCompressWriter(Configuration conf, FSDataOutputStream out,
- Class keyClass, Class valClass, CompressionCodec codec)
+ Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
throws IOException {
- super.init(null, conf, out, keyClass, valClass, true, codec);
+ super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
init(1000000);
initializeFileHeader();
writeFileHeader();
finalizeFileHeader();
}
-
+
boolean isCompressed() { return true; }
boolean isBlockCompressed() { return true; }
@@ -928,6 +1110,7 @@
private Class valClass;
private CompressionCodec codec = null;
+ private Metadata metadata = null;
private byte[] sync = new byte[SYNC_HASH_SIZE];
private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
@@ -1046,6 +1229,11 @@
}
}
+ this.metadata = new Metadata();
+ if (version >= VERSION_WITH_METADATA) { // if version >= 6
+ this.metadata.readFields(in);
+ }
+
if (version > 1) { // if version > 1
in.readFully(sync); // read sync bytes
}
@@ -1095,6 +1283,11 @@
/** Returns the compression codec of data in this file. */
public CompressionCodec getCompressionCodec() { return codec; }
+ /** Returns the metadata object of the file */
+ public Metadata getMetadata() {
+ return this.metadata;
+ }
+
/** Returns the configuration used for this file. */
Configuration getConf() { return conf; }
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java?view=diff&rev=500378&r1=500377&r2=500378
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java Fri Jan 26 14:11:42 2007
@@ -317,7 +317,91 @@
return sorter;
}
+ /** Unit tests for SequenceFile metadata. */
+ public void testSequenceFileMetadata() throws Exception {
+ LOG.info("Testing SequenceFile with metadata");
+ int count = 1024 * 10;
+ int megabytes = 1;
+ int factor = 5;
+ CompressionCodec codec = new DefaultCodec();
+ Path file = new Path(System.getProperty("test.build.data",".")+"/test.seq.metadata");
+ Path recordCompressedFile =
+ new Path(System.getProperty("test.build.data",".")+"/test.rc.seq.metadata");
+ Path blockCompressedFile =
+ new Path(System.getProperty("test.build.data",".")+"/test.bc.seq.metadata");
+
+ FileSystem fs = FileSystem.getLocal(conf);
+ SequenceFile.Metadata theMetadata = new SequenceFile.Metadata();
+ theMetadata.set(new Text("name_1"), new Text("value_1"));
+ theMetadata.set(new Text("name_2"), new Text("value_2"));
+ theMetadata.set(new Text("name_3"), new Text("value_3"));
+ theMetadata.set(new Text("name_4"), new Text("value_4"));
+
+ int seed = new Random().nextInt();
+
+ try {
+ // SequenceFile.Writer
+ writeMetadataTest(fs, count, seed, file, CompressionType.NONE, null, theMetadata);
+ SequenceFile.Metadata aMetadata = readMetadata(fs, file);
+ if (!theMetadata.equals(aMetadata)) {
+ LOG.info("The original metadata:\n" + theMetadata.toString());
+ LOG.info("The retrieved metadata:\n" + aMetadata.toString());
+ throw new RuntimeException("metadata not match: " + 1);
+ }
+ // SequenceFile.RecordCompressWriter
+ writeMetadataTest(fs, count, seed, recordCompressedFile, CompressionType.RECORD,
+ codec, theMetadata);
+ aMetadata = readMetadata(fs, recordCompressedFile);
+ if (!theMetadata.equals(aMetadata)) {
+ LOG.info("The original metadata:\n" + theMetadata.toString());
+ LOG.info("The retrieved metadata:\n" + aMetadata.toString());
+ throw new RuntimeException("metadata not match: " + 2);
+ }
+ // SequenceFile.BlockCompressWriter
+ writeMetadataTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK,
+ codec, theMetadata);
+ aMetadata =readMetadata(fs, blockCompressedFile);
+ if (!theMetadata.equals(aMetadata)) {
+ LOG.info("The original metadata:\n" + theMetadata.toString());
+ LOG.info("The retrieved metadata:\n" + aMetadata.toString());
+ throw new RuntimeException("metadata not match: " + 3);
+ }
+ } finally {
+ fs.close();
+ }
+ LOG.info("Successfully tested SequenceFile with metadata");
+ }
+
+
+ private static SequenceFile.Metadata readMetadata(FileSystem fs, Path file)
+ throws IOException {
+ LOG.info("reading file: " + file.toString() + "\n");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);
+ SequenceFile.Metadata meta = reader.getMetadata();
+ reader.close();
+ return meta;
+ }
+ private static void writeMetadataTest(FileSystem fs, int count, int seed, Path file,
+ CompressionType compressionType, CompressionCodec codec, SequenceFile.Metadata metadata)
+ throws IOException {
+ fs.delete(file);
+ LOG.info("creating " + count + " records with metadata and with" + compressionType +
+ " compression");
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, conf, file,
+ RandomDatum.class, RandomDatum.class, compressionType, codec, null, metadata);
+ RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+ for (int i = 0; i < count; i++) {
+ generator.next();
+ RandomDatum key = generator.getKey();
+ RandomDatum value = generator.getValue();
+
+ writer.append(key, value);
+ }
+ writer.close();
+ }
+
/** For debugging and testing. */
public static void main(String[] args) throws Exception {
int count = 1024 * 1024;