You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/15 02:13:33 UTC
svn commit: r443532 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/io/SequenceFile.java
src/test/org/apache/hadoop/io/RandomDatum.java
src/test/org/apache/hadoop/io/TestSequenceFile.java
Author: cutting
Date: Thu Sep 14 17:13:32 2006
New Revision: 443532
URL: http://svn.apache.org/viewvc?view=rev&rev=443532
Log:
HADOOP-532. Fix a bug reading value-compressed sequence files, where an exception was thrown reporting that the full value had not been read. Contributed by Owen.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/RandomDatum.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=443532&r1=443531&r2=443532
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Sep 14 17:13:32 2006
@@ -17,6 +17,10 @@
4. HADOOP-288. Add a file caching system and use it in MapReduce to
cache job jar files on slave nodes. (Mahadev Konar via cutting)
+5. HADOOP-532. Fix a bug reading value-compressed sequence files,
+ where an exception was thrown reporting that the full value had not
+ been read. (omalley via cutting)
+
Release 0.6.1 - 2006-08-13
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=443532&r1=443531&r2=443532
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Thu Sep 14 17:13:32 2006
@@ -976,10 +976,14 @@
if (version > 2) { // if version > 2
this.decompress = in.readBoolean(); // is compressed?
+ } else {
+ decompress = false;
}
if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4
this.blockCompressed = in.readBoolean(); // is block-compressed?
+ } else {
+ blockCompressed = false;
}
// if version >= 5
@@ -1008,9 +1012,9 @@
valBuffer = new DataInputBuffer();
if (decompress) {
valInFilter = this.codec.createInputStream(valBuffer);
- valIn = new DataInputStream(new BufferedInputStream(valInFilter));
+ valIn = new DataInputStream(valInFilter);
} else {
- valIn = new DataInputStream(new BufferedInputStream(valBuffer));
+ valIn = valBuffer;
}
if (blockCompressed) {
@@ -1113,10 +1117,11 @@
* corresponding to the 'current' key
*/
private synchronized void seekToCurrentValue() throws IOException {
- if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+ if (!blockCompressed) {
if (decompress) {
valInFilter.resetState();
}
+ valBuffer.reset();
} else {
// Check if this is the first value in the 'block' to be read
if (lazyDecompress && !valuesDecompressed) {
@@ -1160,13 +1165,15 @@
// Position stream to 'current' value
seekToCurrentValue();
- if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+ if (!blockCompressed) {
val.readFields(valIn);
- if (valBuffer.getPosition() != valBuffer.getLength())
+ if (valIn.read() > 0) {
+ LOG.info("available bytes: " + valIn.available());
throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
+ " bytes, should read " +
(valBuffer.getLength()-keyLength));
+ }
} else {
// Get the value
int valLength = WritableUtils.readVInt(valLenIn);
@@ -1190,7 +1197,7 @@
throw new IOException("wrong key class: "+key.getClass().getName()
+" is not "+keyClass);
- if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+ if (!blockCompressed) {
outBuf.reset();
keyLength = next(outBuf);
@@ -1200,6 +1207,7 @@
valBuffer.reset(outBuf.getData(), outBuf.getLength());
key.readFields(valBuffer);
+ valBuffer.mark(0);
if (valBuffer.getPosition() != keyLength)
throw new IOException(key + " read " + valBuffer.getPosition()
+ " bytes, should read " + keyLength);
@@ -1271,7 +1279,7 @@
/** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
public synchronized int next(DataOutputBuffer buffer) throws IOException {
// Unsupported for block-compressed sequence files
- if (version >= BLOCK_COMPRESS_VERSION && blockCompressed) {
+ if (blockCompressed) {
throw new IOException("Unsupported call for block-compressed" +
" SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
}
@@ -1308,7 +1316,7 @@
*/
public int nextRaw(DataOutputBuffer key, ValueBytes val)
throws IOException {
- if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+ if (!blockCompressed) {
if (in.getPos() >= end)
return -1;
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/RandomDatum.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/RandomDatum.java?view=diff&rev=443532&r1=443531&r2=443532
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/RandomDatum.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/RandomDatum.java Thu Sep 14 17:13:32 2006
@@ -26,11 +26,15 @@
public RandomDatum() {}
public RandomDatum(Random random) {
- length = 10 + random.nextInt(100);
+ length = 10 + (int) Math.pow(10.0, random.nextFloat() * 3.0);
data = new byte[length];
random.nextBytes(data);
}
+ public int getLength() {
+ return length;
+ }
+
public void write(DataOutput out) throws IOException {
out.writeInt(length);
out.write(data);
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java?view=diff&rev=443532&r1=443531&r2=443532
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java Thu Sep 14 17:13:32 2006
@@ -47,6 +47,7 @@
new Path(System.getProperty("test.build.data",".")+"/test.bc.seq");
int seed = new Random().nextInt();
+ LOG.info("Seed = " + seed);
FileSystem fs = new LocalFileSystem(conf);
try {
@@ -115,7 +116,8 @@
CompressionType compressionType)
throws IOException {
fs.delete(file);
- LOG.debug("creating with " + count + " records");
+ LOG.info("creating " + count + " records with " + compressionType +
+ " compression");
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, conf, file,
RandomDatum.class, RandomDatum.class, compressionType);
@@ -146,25 +148,36 @@
RandomDatum key = generator.getKey();
RandomDatum value = generator.getValue();
- if ((i%5) == 10) {
- // Testing 'raw' apis
- rawKey.reset();
- reader.nextRaw(rawKey, rawValue);
- } else {
- // Testing 'non-raw' apis
- if ((i%2) == 0) {
- reader.next(k);
- reader.getCurrentValue(v);
+ try {
+ if ((i%5) == 10) {
+ // Testing 'raw' apis
+ rawKey.reset();
+ reader.nextRaw(rawKey, rawValue);
} else {
- reader.next(k, v);
+ // Testing 'non-raw' apis
+ if ((i%2) == 0) {
+ reader.next(k);
+ reader.getCurrentValue(v);
+ } else {
+ reader.next(k, v);
+ }
+ // Sanity check
+ if (!k.equals(key))
+ throw new RuntimeException("wrong key at " + i);
+ if (!v.equals(value))
+ throw new RuntimeException("wrong value at " + i);
}
-
- // Sanity check
- if (!k.equals(key))
- throw new RuntimeException("wrong key at " + i);
- if (!v.equals(value))
- throw new RuntimeException("wrong value at " + i);
+ } catch (IOException ioe) {
+ LOG.info("Problem on row " + i);
+ LOG.info("Expected value = " + value);
+ LOG.info("Expected len = " + value.getLength());
+ LOG.info("Actual value = " + v);
+ LOG.info("Actual len = " + v.getLength());
+ LOG.info("Key equals: " + k.equals(key));
+ LOG.info("value equals: " + v.equals(value));
+ throw ioe;
}
+
}
reader.close();
}
@@ -284,9 +297,11 @@
boolean merge = false;
String compressType = "NONE";
Path file = null;
+ int seed = new Random().nextInt();
String usage = "Usage: SequenceFile (-local | -dfs <namenode:port>) " +
- "[-count N] " + "[-check] [-compressType <NONE|RECORD|BLOCK>] " +
+ "[-count N] " +
+ "[-seed #] [-check] [-compressType <NONE|RECORD|BLOCK>] " +
"[[-rwonly] | {[-megabytes M] [-factor F] [-nocreate] [-fast] [-merge]}] " +
" file";
if (args.length == 0) {
@@ -304,7 +319,9 @@
} else if (args[i].equals("-megabytes")) {
megabytes = Integer.parseInt(args[++i]);
} else if (args[i].equals("-factor")) {
- factor = Integer.parseInt(args[++i]);
+ factor = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-seed")) {
+ seed = Integer.parseInt(args[++i]);
} else if (args[i].equals("-rwonly")) {
rwonly = true;
} else if (args[i].equals("-nocreate")) {
@@ -326,6 +343,7 @@
LOG.info("megabytes = " + megabytes);
LOG.info("factor = " + factor);
LOG.info("create = " + create);
+ LOG.info("seed = " + seed);
LOG.info("rwonly = " + rwonly);
LOG.info("check = " + check);
LOG.info("fast = " + fast);
@@ -338,7 +356,6 @@
System.exit(-1);
}
- int seed = 0;
CompressionType compressionType =
CompressionType.valueOf(compressType);