You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/07/06 20:37:44 UTC
svn commit: r209495 - in /lucene/nutch/branches/mapred/src:
java/org/apache/nutch/io/ java/org/apache/nutch/mapred/
test/org/apache/nutch/mapred/
Author: cutting
Date: Wed Jul 6 11:37:44 2005
New Revision: 209495
URL: http://svn.apache.org/viewcvs?rev=209495&view=rev
Log:
Add unit test for SequenceFile InputFormat. Fix code to pass unit test. SequenceFile now inserts sync marks after a fixed number of bytes rather than after a fixed number of entries.
Added:
lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/
lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=209495&r1=209494&r2=209495&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Wed Jul 6 11:37:44 2005
@@ -39,10 +39,12 @@
};
private static final int SYNC_ESCAPE = -1; // "length" of sync entries
- private static final int SYNC_INTERVAL = 10; // num entries between syncs
private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
+ /** The number of bytes between sync points.*/
+ public static final int SYNC_INTERVAL = 100*SYNC_SIZE;
+
/** Write key/value pairs to a sequence-format file. */
public static class Writer {
private NFSDataOutputStream out;
@@ -56,7 +58,7 @@
// Insert a globally unique 16-byte value every few entries, so that one
// can seek into the middle of a file and then synchronize with record
// starts and ends by scanning for this value.
- private long count; // number of entries added
+ private long lastSyncPos; // position of last sync
private final byte[] sync; // 16 random bytes
{
try { // use hash of uid + host
@@ -145,7 +147,9 @@
if (keyLength == 0)
throw new IOException("zero length keys not allowed");
- if ((++count % SYNC_INTERVAL) == 0) { // time to emit sync
+ if (out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
+ lastSyncPos = out.getPos(); // update lastSyncPos
+ //LOG.info("sync@"+lastSyncPos);
out.writeInt(SYNC_ESCAPE); // escape it
out.write(sync); // write sync
}
@@ -297,6 +301,7 @@
int length = in.readInt();
if (version[3] > 1 && length == SYNC_ESCAPE) { // process a sync entry
+ //LOG.info("sync@"+in.getPos());
in.readFully(syncCheck); // read syncCheck
if (!Arrays.equals(sync, syncCheck)) // check it
throw new IOException("File is corrupt!");
@@ -318,12 +323,12 @@
/** Seek to the next sync mark past a given position.*/
public synchronized void sync(long position) throws IOException {
- if (position+sync.length >= end) {
+ if (position+SYNC_SIZE >= end) {
seek(end);
return;
}
- seek(position);
+ seek(position+4); // skip escape
in.readFully(syncCheck);
int syncLen = sync.length;
for (int i = 0; in.getPos() < end; i++) {
@@ -332,8 +337,10 @@
if (sync[j] != syncCheck[(i+j)%syncLen])
break;
}
- if (j == syncLen)
+ if (j == syncLen) {
+ in.seek(in.getPos() - SYNC_SIZE); // position before sync
return;
+ }
syncCheck[i%syncLen] = in.readByte();
}
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java?rev=209495&r1=209494&r2=209495&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/FileSplit.java Wed Jul 6 11:37:44 2005
@@ -56,6 +56,7 @@
/** The number of bytes in the file to process. */
public long getLength() { return length; }
+ public String toString() { return file + ":" + start + "+" + length; }
////////////////////////////////////////////
// Writable methods
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java?rev=209495&r1=209494&r2=209495&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/InputFormatBase.java Wed Jul 6 11:37:44 2005
@@ -21,14 +21,25 @@
import java.util.Arrays;
import java.util.ArrayList;
+import java.util.logging.Logger;
import org.apache.nutch.fs.NutchFileSystem;
+import org.apache.nutch.util.LogFormatter;
/** A base class for {@link InputFormat}. */
public abstract class InputFormatBase implements InputFormat {
+ public static final Logger LOG =
+ LogFormatter.getLogger("org.apache.nutch.mapred.InputFormatBase");
+
private static final double SPLIT_SLOP = 0.1; // 10% slop
+ private int minSplitSize = 1;
+
+ protected void setMinSplitSize(int minSplitSize) {
+ this.minSplitSize = minSplitSize;
+ }
+
public abstract RecordReader getRecordReader(NutchFileSystem fs,
FileSplit split,
JobConf job) throws IOException;
@@ -84,8 +95,11 @@
totalSize += fs.getLength(files[i]);
}
- long bytesPerSplit = totalSize / numSplits;
+ long bytesPerSplit = Math.max(totalSize / numSplits, minSplitSize);
long maxPerSplit = bytesPerSplit + (long)(bytesPerSplit*SPLIT_SLOP);
+
+ //LOG.info("bytesPerSplit = " + bytesPerSplit);
+ //LOG.info("maxPerSplit = " + maxPerSplit);
ArrayList splits = new ArrayList(numSplits); // generate splits
for (int i = 0; i < files.length; i++) {
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java?rev=209495&r1=209494&r2=209495&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileInputFormat.java Wed Jul 6 11:37:44 2005
@@ -31,6 +31,10 @@
/** An {@link InputFormat} for {@link SequenceFile}s. */
public class SequenceFileInputFormat extends InputFormatBase {
+ public SequenceFileInputFormat() {
+ setMinSplitSize(SequenceFile.SYNC_INTERVAL);
+ }
+
protected File[] listFiles(NutchFileSystem fs, JobConf job)
throws IOException {
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java?rev=209495&r1=209494&r2=209495&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/SequenceFileRecordReader.java Wed Jul 6 11:37:44 2005
@@ -38,7 +38,10 @@
this.in = new SequenceFile.Reader(fs, split.getFile().toString());
this.end = split.getStart() + split.getLength();
- in.sync(split.getStart()); // sync to start
+ if (split.getStart() > in.getPosition())
+ in.sync(split.getStart()); // sync to start
+
+ more = in.getPosition() < end;
}
Added: lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java?rev=209495&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java (added)
+++ lucene/nutch/branches/mapred/src/test/org/apache/nutch/mapred/TestSequenceFileInputFormat.java Wed Jul 6 11:37:44 2005
@@ -0,0 +1,105 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.mapred;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import java.util.logging.*;
+
+import org.apache.nutch.fs.*;
+import org.apache.nutch.io.*;
+import org.apache.nutch.util.*;
+
+public class TestSequenceFileInputFormat extends TestCase {
+ private static final Logger LOG = InputFormatBase.LOG;
+
+ private static int MAX_LENGTH = 10000;
+
+ public void testFormat() throws Exception {
+ JobConf job = new JobConf(NutchConf.get());
+ NutchFileSystem fs = NutchFileSystem.getNamed("local");
+ File dir = new File(System.getProperty("test.build.data",".") + "/mrtest");
+ File file = new File(dir, "test.seq");
+
+ int seed = new Random().nextInt();
+ LOG.info("seed = "+seed);
+ Random random = new Random(seed);
+
+ dir.mkdirs();
+ job.setInputDir(dir);
+
+ // for a variety of lengths
+ for (int length = 0; length < MAX_LENGTH;
+ length+= random.nextInt(MAX_LENGTH/10)+1) {
+
+ LOG.info("creating; entries = " + length);
+
+ // create a file with length entries
+ file.delete();
+ SequenceFile.Writer writer =
+ new SequenceFile.Writer(fs, file.toString(),
+ IntWritable.class, BytesWritable.class);
+ try {
+ for (int i = 0; i < length; i++) {
+ IntWritable key = new IntWritable(i);
+ byte[] data = new byte[random.nextInt(10)];
+ random.nextBytes(data);
+ BytesWritable value = new BytesWritable(data);
+ writer.append(key, value);
+ }
+ } finally {
+ writer.close();
+ }
+
+ // try splitting the file in a variety of sizes
+ InputFormat format = new SequenceFileInputFormat();
+ IntWritable key = new IntWritable();
+ BytesWritable value = new BytesWritable();
+ for (int i = 0; i < 3; i++) {
+ int numSplits =
+ random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
+ LOG.info("splitting: requesting = " + numSplits);
+ FileSplit[] splits = format.getSplits(fs, job, numSplits);
+ LOG.info("splitting: got = " + splits.length);
+
+ // check each split
+ BitSet bits = new BitSet(length);
+ for (int j = 0; j < splits.length; j++) {
+ RecordReader reader = format.getRecordReader(fs, splits[j], job);
+ int count = 0;
+ while (reader.next(key, value)) {
+// if (bits.get(key.get())) {
+// LOG.info("splits["+j+"]="+splits[j]+" : " + key.get());
+// LOG.info("@"+reader.getPos());
+// }
+ assertFalse("Key in multiple partitions.", bits.get(key.get()));
+ bits.set(key.get());
+ count++;
+ }
+ //LOG.info("splits["+j+"]="+splits[j]+" count=" + count);
+ }
+ assertEquals("Some keys in no partition.", length, bits.cardinality());
+ }
+
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestSequenceFileInputFormat().testFormat();
+ }
+}