You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/10/17 21:52:43 UTC
svn commit: r585648 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/io/ src/test/org/apache/hadoop/io/
Author: omalley
Date: Wed Oct 17 12:52:42 2007
New Revision: 585648
URL: http://svn.apache.org/viewvc?rev=585648&view=rev
Log:
HADOOP-2033. Make SequenceFile.Writer.sync actually write a sync block.
Contributed by omalley.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestArrayFile.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=585648&r1=585647&r2=585648&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Oct 17 12:52:42 2007
@@ -321,6 +321,10 @@
null was missing and hence NPE would be thrown sometimes. This issue fixes
that problem. (Amareshwari Sri Ramadasu via ddas)
+ HADOOP-2033. The SequenceFile.Writer.sync method was a no-op, which caused
+ very uneven splits for applications like distcp that count on them.
+ (omalley)
+
IMPROVEMENTS
HADOOP-1908. Restructure data node code so that block sending and
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java?rev=585648&r1=585647&r2=585648&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/MapFile.java Wed Oct 17 12:52:42 2007
@@ -19,6 +19,9 @@
package org.apache.hadoop.io;
import java.io.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.Progressable;
@@ -44,6 +47,8 @@
* SequenceFile.Sorter}.
*/
public class MapFile {
+ private static final Log LOG = LogFactory.getLog(MapFile.class);
+
/** The name of the index file. */
public static final String INDEX_FILE_NAME = "index";
@@ -302,7 +307,7 @@
count++;
}
} catch (EOFException e) {
- SequenceFile.LOG.warn("Unexpected EOF reading " + index +
+ LOG.warn("Unexpected EOF reading " + index +
" at entry #" + count + ". Ignoring.");
} finally {
indexClosed = true;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=585648&r1=585647&r2=585648&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Wed Oct 17 12:52:42 2007
@@ -20,7 +20,6 @@
import java.io.*;
import java.util.*;
-import java.net.InetAddress;
import java.rmi.server.UID;
import java.security.MessageDigest;
import org.apache.commons.logging.*;
@@ -43,8 +42,7 @@
/** Support for flat files of binary key/value pairs. */
public class SequenceFile {
- public static final Log LOG =
- LogFactory.getLog("org.apache.hadoop.io.SequenceFile");
+ private static final Log LOG = LogFactory.getLog(SequenceFile.class);
private SequenceFile() {} // no public ctor
@@ -757,6 +755,11 @@
/** create a sync point */
public void sync() throws IOException {
+ if (sync != null && lastSyncPos != out.getPos()) {
+ out.writeInt(SYNC_ESCAPE); // mark the start of the sync
+ out.write(sync); // write sync
+ lastSyncPos = out.getPos(); // update lastSyncPos
+ }
}
/** Returns the configuration of this file. */
@@ -780,10 +783,7 @@
synchronized void checkAndWriteSync() throws IOException {
if (sync != null &&
out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
- lastSyncPos = out.getPos(); // update lastSyncPos
- //LOG.info("sync@"+lastSyncPos);
- out.writeInt(SYNC_ESCAPE); // escape it
- out.write(sync); // write sync
+ sync();
}
}
@@ -955,10 +955,6 @@
val.writeCompressedBytes(out); // 'value' data
}
-
- public void sync() throws IOException {
- }
-
} // RecordCompressionWriter
/** Write compressed key/value blocks to a sequence-format file. */
@@ -1045,13 +1041,9 @@
}
/** Compress and flush contents to dfs */
- private synchronized void writeBlock() throws IOException {
+ public synchronized void sync() throws IOException {
if (noBufferedRecords > 0) {
- // Write 'sync' marker
- if (sync != null) {
- out.writeInt(SYNC_ESCAPE);
- out.write(sync);
- }
+ super.sync();
// No. of records
WritableUtils.writeVInt(out, noBufferedRecords);
@@ -1080,15 +1072,11 @@
/** Close the file. */
public synchronized void close() throws IOException {
if (out != null) {
- writeBlock();
+ sync();
}
super.close();
}
- public void sync() throws IOException {
- writeBlock();
- }
-
/** Append a key/value pair. */
public synchronized void append(Writable key, Writable val)
throws IOException {
@@ -1116,7 +1104,7 @@
// Compress and flush?
int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
if (currentBlockSize >= compressionBlockSize) {
- writeBlock();
+ sync();
}
}
@@ -1144,7 +1132,7 @@
// Compress and flush?
int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
if (currentBlockSize >= compressionBlockSize) {
- writeBlock();
+ sync();
}
}
@@ -1586,15 +1574,26 @@
return more;
}
- private synchronized int checkAndReadSync(int length)
- throws IOException {
+ /**
+ * Read and return the next record length, potentially skipping over
+ * a sync block.
+ * @return the length of the next record or -1 if there is no next record
+ * @throws IOException
+ */
+ private synchronized int readRecordLength() throws IOException {
+ if (in.getPos() >= end) {
+ return -1;
+ }
+ int length = in.readInt();
if (version > 1 && sync != null &&
length == SYNC_ESCAPE) { // process a sync entry
- //LOG.info("sync@"+in.getPos());
in.readFully(syncCheck); // read syncCheck
if (!Arrays.equals(sync, syncCheck)) // check it
throw new IOException("File is corrupt!");
syncSeen = true;
+ if (in.getPos() >= end) {
+ return -1;
+ }
length = in.readInt(); // re-read length
} else {
syncSeen = false;
@@ -1614,11 +1613,11 @@
throw new IOException("Unsupported call for block-compressed" +
" SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
}
- if (in.getPos() >= end)
- return -1;
-
try {
- int length = checkAndReadSync(in.readInt());
+ int length = readRecordLength();
+ if (length == -1) {
+ return -1;
+ }
int keyLength = in.readInt();
buffer.write(in, length);
return keyLength;
@@ -1642,16 +1641,16 @@
* Read 'raw' records.
* @param key - The buffer into which the key is read
* @param val - The 'raw' value
- * @return Returns the total record length
+ * @return Returns the total record length or -1 for end of file
* @throws IOException
*/
public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val)
throws IOException {
if (!blockCompressed) {
- if (in.getPos() >= end)
+ int length = readRecordLength();
+ if (length == -1) {
return -1;
-
- int length = checkAndReadSync(in.readInt());
+ }
int keyLength = in.readInt();
int valLength = length - keyLength;
key.write(in, keyLength);
@@ -1701,16 +1700,16 @@
/**
* Read 'raw' keys.
* @param key - The buffer into which the key is read
- * @return Returns the key length
+ * @return Returns the key length or -1 for end of file
* @throws IOException
*/
public int nextRawKey(DataOutputBuffer key)
throws IOException {
if (!blockCompressed) {
- if (in.getPos() >= end)
+ recordLength = readRecordLength();
+ if (recordLength == -1) {
return -1;
-
- recordLength = checkAndReadSync(in.readInt());
+ }
keyLength = in.readInt();
key.write(in, keyLength);
return keyLength;
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestArrayFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestArrayFile.java?rev=585648&r1=585647&r2=585648&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestArrayFile.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestArrayFile.java Wed Oct 17 12:52:42 2007
@@ -28,7 +28,7 @@
/** Support for flat files of binary key/value pairs. */
public class TestArrayFile extends TestCase {
- private static Log LOG = SequenceFile.LOG;
+ private static final Log LOG = LogFactory.getLog(TestArrayFile.class);
private static String FILE =
System.getProperty("test.build.data",".") + "/test.array";
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java?rev=585648&r1=585647&r2=585648&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java Wed Oct 17 12:52:42 2007
@@ -35,7 +35,7 @@
/** Support for flat files of binary key/value pairs. */
public class TestSequenceFile extends TestCase {
- private static Log LOG = SequenceFile.LOG;
+ private static final Log LOG = LogFactory.getLog(TestSequenceFile.class);
private static Configuration conf = new Configuration();
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java?rev=585648&r1=585647&r2=585648&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestSetFile.java Wed Oct 17 12:52:42 2007
@@ -30,7 +30,7 @@
/** Support for flat files of binary key/value pairs. */
public class TestSetFile extends TestCase {
- private static Log LOG = SequenceFile.LOG;
+ private static final Log LOG = LogFactory.getLog(TestSetFile.class);
private static String FILE =
System.getProperty("test.build.data",".") + "/test.set";