You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/05/04 07:34:39 UTC
svn commit: r771181 - in /hadoop/core/trunk: ./ src/mapred/
src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/
src/test/org/apache/hadoop/mapreduce/
Author: ddas
Date: Mon May 4 05:34:36 2009
New Revision: 771181
URL: http://svn.apache.org/viewvc?rev=771181&view=rev
Log:
HADOOP-5266. Adds the capability to do mark/reset of the reduce values iterator in the Context object API. Contributed by Jothi Padmanabhan.
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BackupStore.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIterator.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIteratorInterface.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestValueIterReset.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/mapred-default.xml
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May 4 05:34:36 2009
@@ -104,6 +104,9 @@
HADOOP-5752. Add a new hdfs image processor, Delimited, to oiv. (Jakob
Homan via szetszwo)
+ HADOOP-5266. Adds the capability to do mark/reset of the reduce values
+ iterator in the Context object API. (Jothi Padmanabhan via ddas)
+
IMPROVEMENTS
HADOOP-4565. Added CombineFileInputFormat to use data locality information
Modified: hadoop/core/trunk/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/mapred-default.xml?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/mapred-default.xml (original)
+++ hadoop/core/trunk/src/mapred/mapred-default.xml Mon May 4 05:34:36 2009
@@ -524,6 +524,14 @@
</property>
<property>
+ <name>mapred.job.reduce.markreset.buffer.percent</name>
+ <value>0.0</value>
+ <description>The percentage of memory -relative to the maximum heap size- to
+ be used for caching values when using the mark-reset functionality.
+ </description>
+</property>
+
+<property>
<name>mapred.map.tasks.speculative.execution</name>
<value>true</value>
<description>If true, then multiple instances of some map tasks
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BackupStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BackupStore.java?rev=771181&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BackupStore.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BackupStore.java Mon May 4 05:34:36 2009
@@ -0,0 +1,611 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.IFile.InMemoryReader;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapred.Merger.Segment;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * <code>BackupStore</code> is an utility class that is used to support
+ * the mark-reset functionality of values iterator
+ *
+ * <p>It has two caches - a memory cache and a file cache where values are
+ * stored as they are iterated, after a mark. On reset, values are retrieved
+ * from these caches. Framework moves from the memory cache to the
+ * file cache when the memory cache becomes full.
+ *
+ */
+public class BackupStore<K,V> {
+
+ private static final Log LOG = LogFactory.getLog(BackupStore.class.getName());
+ private static final int MAX_VINT_SIZE = 9;
+ private static final int EOF_MARKER_SIZE = 2 * MAX_VINT_SIZE;
+ private final TaskAttemptID tid;
+
+ private MemoryCache memCache;
+ private FileCache fileCache;
+
+ List<Segment<K,V>> segmentList = new LinkedList<Segment<K,V>>();
+ private int readSegmentIndex = 0;
+ private int firstSegmentOffset = 0;
+
+ private int currentKVOffset = 0;
+ private int nextKVOffset = -1;
+
+ private DataInputBuffer currentKey = null;
+ private DataInputBuffer currentValue = new DataInputBuffer();
+ private DataInputBuffer currentDiskValue = new DataInputBuffer();
+
+ private boolean hasMore = false;
+ private boolean inReset = false;
+ private boolean clearMarkFlag = false;
+ private boolean lastSegmentEOF = false;
+
+ public BackupStore(Configuration conf, TaskAttemptID taskid)
+ throws IOException {
+
+ final float bufferPercent =
+ conf.getFloat("mapred.job.reduce.markreset.buffer.percent", 0f);
+
+ if (bufferPercent > 1.0 || bufferPercent < 0.0) {
+ throw new IOException("mapred.job.reduce.markreset.buffer.percent" +
+ bufferPercent);
+ }
+
+ int maxSize = (int)Math.min(
+ Runtime.getRuntime().maxMemory() * bufferPercent, Integer.MAX_VALUE);
+
+ // Support an absolute size also.
+ int tmp = conf.getInt("mapred.job.reduce.markreset.buffer.size", 0);
+ if (tmp > 0) {
+ maxSize = tmp;
+ }
+
+ memCache = new MemoryCache(maxSize);
+ fileCache = new FileCache(conf);
+ tid = taskid;
+
+ LOG.info("Created a new BackupStore with a memory of " + maxSize);
+
+ }
+
+ /**
+ * Write the given K,V to the cache.
+ * Write to memcache if space is available, else write to the filecache
+ * @param key
+ * @param value
+ * @throws IOException
+ */
+ public void write(DataInputBuffer key, DataInputBuffer value)
+ throws IOException {
+
+ assert (key != null && value != null);
+
+ if (fileCache.isActive()) {
+ fileCache.write(key, value);
+ return;
+ }
+
+ if (memCache.reserveSpace(key, value)) {
+ memCache.write(key, value);
+ } else {
+ fileCache.activate();
+ fileCache.write(key, value);
+ }
+ }
+
+ public void mark() throws IOException {
+
+ // We read one KV pair in advance in hasNext.
+ // If hasNext has read the next KV pair from a new segment, but the
+ // user has not called next() for that KV, then reset the readSegmentIndex
+ // to the previous segment
+
+ if (nextKVOffset == 0) {
+ assert (readSegmentIndex != 0);
+ assert (currentKVOffset != 0);
+ readSegmentIndex --;
+ }
+
+ // just drop segments before the current active segment
+
+ int i = 0;
+ Iterator<Segment<K,V>> itr = segmentList.iterator();
+ while (itr.hasNext()) {
+ Segment<K,V> s = itr.next();
+ if (i == readSegmentIndex) {
+ break;
+ }
+ s.close();
+ itr.remove();
+ i++;
+ LOG.debug("Dropping a segment");
+ }
+
+ // FirstSegmentOffset is the offset in the current segment from where we
+ // need to start reading on the next reset
+
+ firstSegmentOffset = currentKVOffset;
+ readSegmentIndex = 0;
+
+ LOG.debug("Setting the FirsSegmentOffset to " + currentKVOffset);
+ }
+
+ public void reset() throws IOException {
+
+ // Create a new segment for the previously written records only if we
+ // are not already in the reset mode
+
+ if (!inReset) {
+ if (fileCache.isActive) {
+ fileCache.createInDiskSegment();
+ } else {
+ memCache.createInMemorySegment();
+ }
+ }
+
+ inReset = true;
+
+ // Reset the segments to the correct position from where the next read
+ // should begin.
+ for (int i = 0; i < segmentList.size(); i++) {
+ Segment<K,V> s = segmentList.get(i);
+ if (s.inMemory()) {
+ int offset = (i == 0) ? firstSegmentOffset : 0;
+ s.getReader().reset(offset);
+ } else {
+ s.closeReader();
+ if (i == 0) {
+ s.reinitReader(firstSegmentOffset);
+ s.getReader().disableChecksumValidation();
+ }
+ }
+ }
+
+ currentKVOffset = firstSegmentOffset;
+ nextKVOffset = -1;
+ readSegmentIndex = 0;
+ hasMore = false;
+ lastSegmentEOF = false;
+
+ LOG.debug("Reset - First segment offset is " + firstSegmentOffset +
+ " Segment List Size is " + segmentList.size());
+ }
+
+ public boolean hasNext() throws IOException {
+
+ if (lastSegmentEOF) {
+ return false;
+ }
+
+ // We read the next KV from the cache to decide if there is any left.
+ // Since hasNext can be called several times before the actual call to
+ // next(), we use hasMore to avoid extra reads. hasMore is set to false
+ // when the user actually consumes this record in next()
+
+ if (hasMore) {
+ return true;
+ }
+
+ Segment<K,V> seg = segmentList.get(readSegmentIndex);
+ // Mark the current position. This would be set to currentKVOffset
+ // when the user consumes this record in next().
+ nextKVOffset = (int) seg.getActualPosition();
+ if (seg.nextRawKey()) {
+ currentKey = seg.getKey();
+ seg.getValue(currentValue);
+ hasMore = true;
+ return true;
+ } else {
+ if (!seg.inMemory()) {
+ seg.closeReader();
+ }
+ }
+
+ // If this is the last segment, mark the lastSegmentEOF flag and return
+ if (readSegmentIndex == segmentList.size() - 1) {
+ nextKVOffset = -1;
+ lastSegmentEOF = true;
+ return false;
+ }
+
+ nextKVOffset = 0;
+ readSegmentIndex ++;
+
+ Segment<K,V> nextSegment = segmentList.get(readSegmentIndex);
+
+ // We possibly are moving from a memory segment to a disk segment.
+ // Reset so that we do not corrupt the in-memory segment buffer.
+ // See HADOOP-5494
+
+ if (!nextSegment.inMemory()) {
+ currentValue.reset(currentDiskValue.getData(),
+ currentDiskValue.getLength());
+ nextSegment.init(null);
+ }
+
+ if (nextSegment.nextRawKey()) {
+ currentKey = nextSegment.getKey();
+ nextSegment.getValue(currentValue);
+ hasMore = true;
+ return true;
+ } else {
+ throw new IOException("New segment did not have even one K/V");
+ }
+ }
+
+ public void next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException("iterate past last value");
+ }
+ // Reset hasMore. See comment in hasNext()
+ hasMore = false;
+ currentKVOffset = nextKVOffset;
+ nextKVOffset = -1;
+ }
+
+ public DataInputBuffer nextValue() {
+ return currentValue;
+ }
+
+ public DataInputBuffer nextKey() {
+ return currentKey;
+ }
+
+ public void reinitialize() throws IOException {
+ if (segmentList.size() != 0) {
+ clearSegmentList();
+ }
+ memCache.reinitialize(true);
+ fileCache.reinitialize();
+ readSegmentIndex = firstSegmentOffset = 0;
+ currentKVOffset = 0;
+ nextKVOffset = -1;
+ hasMore = inReset = clearMarkFlag = false;
+ }
+
+ /**
+ * This function is called the ValuesIterator when a mark is called
+ * outside of a reset zone.
+ */
+ public void exitResetMode() throws IOException {
+ inReset = false;
+ if (clearMarkFlag ) {
+ // If a flag was set to clear mark, do the reinit now.
+ // See clearMark()
+ reinitialize();
+ return;
+ }
+ if (!fileCache.isActive) {
+ memCache.reinitialize(false);
+ }
+ }
+
+ /** For writing the first key and value bytes directly from the
+ * value iterators, pass the current underlying output stream
+ * @param length The length of the impending write
+ */
+ public DataOutputStream getOutputStream(int length) throws IOException {
+ if (memCache.reserveSpace(length)) {
+ return memCache.dataOut;
+ } else {
+ fileCache.activate();
+ return fileCache.writer.getOutputStream();
+ }
+ }
+
+ /** This method is called by the valueIterators after writing the first
+ * key and value bytes to the BackupStore
+ * @param length
+ */
+ public void updateCounters(int length) {
+ if (fileCache.isActive) {
+ fileCache.writer.updateCountersForExternalAppend(length);
+ } else {
+ memCache.usedSize += length;
+ }
+ }
+
+ public void clearMark() throws IOException {
+ if (inReset) {
+ // If we are in the reset mode, we just mark a flag and come out
+ // The actual re initialization would be done when we exit the reset
+ // mode
+ clearMarkFlag = true;
+ } else {
+ reinitialize();
+ }
+ }
+
+ private void clearSegmentList() throws IOException {
+ for (Segment<K,V> segment: segmentList) {
+ segment.close();
+ }
+ segmentList.clear();
+ }
+
+ class MemoryCache {
+ private DataOutputBuffer dataOut;
+ private int blockSize;
+ private int usedSize;
+ private final BackupRamManager ramManager;
+
+ // Memory cache is made up of blocks.
+ private int defaultBlockSize = 1024 * 1024;
+
+ public MemoryCache(int maxSize) {
+ ramManager = new BackupRamManager(maxSize);
+ if (maxSize < defaultBlockSize) {
+ defaultBlockSize = maxSize;
+ }
+ }
+
+ /**
+ * Re-initialize the memory cache.
+ *
+ * @param clearAll If true, re-initialize the ramManager also.
+ */
+ void reinitialize(boolean clearAll) {
+ if (clearAll) {
+ ramManager.reinitialize();
+ }
+ int allocatedSize = createNewMemoryBlock(defaultBlockSize,
+ defaultBlockSize);
+ assert(allocatedSize == defaultBlockSize || allocatedSize == 0);
+ LOG.debug("Created a new mem block of " + allocatedSize);
+ }
+
+ private int createNewMemoryBlock(int requestedSize, int minSize) {
+ int allocatedSize = ramManager.reserve(requestedSize, minSize);
+ usedSize = 0;
+ if (allocatedSize == 0) {
+ dataOut = null;
+ blockSize = 0;
+ } else {
+ dataOut = new DataOutputBuffer(allocatedSize);
+ blockSize = allocatedSize;
+ }
+ return allocatedSize;
+ }
+
+ /**
+ * This method determines if there is enough space left in the
+ * memory cache to write to the requested length + space for
+ * subsequent EOF makers.
+ * @param length
+ * @return true if enough space is available
+ */
+ boolean reserveSpace(int length) throws IOException {
+ int availableSize = blockSize - usedSize;
+ if (availableSize >= length + EOF_MARKER_SIZE) {
+ return true;
+ }
+ // Not enough available. Close this block
+ assert (!inReset);
+
+ createInMemorySegment();
+
+ // Create a new block
+ int tmp = Math.max(length + EOF_MARKER_SIZE, defaultBlockSize);
+ availableSize = createNewMemoryBlock(tmp,
+ (length + EOF_MARKER_SIZE));
+
+ return (availableSize == 0) ? false : true;
+ }
+
+ boolean reserveSpace(DataInputBuffer key, DataInputBuffer value)
+ throws IOException {
+ int keyLength = key.getLength() - key.getPosition();
+ int valueLength = value.getLength() - value.getPosition();
+
+ int requestedSize = keyLength + valueLength +
+ WritableUtils.getVIntSize(keyLength) +
+ WritableUtils.getVIntSize(valueLength);
+ return reserveSpace(requestedSize);
+ }
+
+ /**
+ * Write the key and value to the cache in the IFile format
+ * @param key
+ * @param value
+ * @throws IOException
+ */
+ public void write(DataInputBuffer key, DataInputBuffer value)
+ throws IOException {
+ int keyLength = key.getLength() - key.getPosition();
+ int valueLength = value.getLength() - value.getPosition();
+ WritableUtils.writeVInt(dataOut, keyLength);
+ WritableUtils.writeVInt(dataOut, valueLength);
+ dataOut.write(key.getData(), key.getPosition(), keyLength);
+ dataOut.write(value.getData(), value.getPosition(), valueLength);
+ usedSize += keyLength + valueLength +
+ WritableUtils.getVIntSize(keyLength) +
+ WritableUtils.getVIntSize(valueLength);
+ LOG.debug("ID: " + segmentList.size() + " WRITE TO MEM");
+ }
+
+ /**
+ * This method creates a memory segment from the existing buffer
+ * @throws IOException
+ */
+ void createInMemorySegment () throws IOException {
+
+ // If nothing was written in this block because the record size
+ // was greater than the allocated block size, just return.
+ if (usedSize == 0) {
+ ramManager.unreserve(blockSize);
+ return;
+ }
+
+ // spaceAvailable would have ensured that there is enough space
+ // left for the EOF markers.
+ assert ((blockSize - usedSize) >= EOF_MARKER_SIZE);
+
+ WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
+ WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER);
+
+ usedSize += EOF_MARKER_SIZE;
+
+ ramManager.unreserve(blockSize - usedSize);
+
+ Reader<K, V> reader =
+ new InMemoryReader<K, V>(ramManager,
+ (org.apache.hadoop.mapred.TaskAttemptID) tid,
+ dataOut.getData(), 0, usedSize);
+ Segment<K, V> segment = new Segment<K, V>(reader, false);
+ segmentList.add(segment);
+ LOG.debug("Added Memory Segment to List. List Size is " +
+ segmentList.size());
+ }
+ }
+
+ class FileCache {
+ private LocalDirAllocator lDirAlloc;
+ private final Configuration conf;
+ private final FileSystem fs;
+ private boolean isActive = false;
+
+ private Path file = null;
+ private IFile.Writer<K,V> writer = null;
+ private int spillNumber = 0;
+
+ public FileCache(Configuration conf)
+ throws IOException {
+ this.conf = conf;
+ this.fs = FileSystem.getLocal(conf);
+ this.lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+ }
+
+ void write(DataInputBuffer key, DataInputBuffer value)
+ throws IOException {
+ if (writer == null) {
+ // If spillNumber is 0, we should have called activate and not
+ // come here at all
+ assert (spillNumber != 0);
+ writer = createSpillFile();
+ }
+ writer.append(key, value);
+ LOG.debug("ID: " + segmentList.size() + " WRITE TO DISK");
+ }
+
+ void reinitialize() {
+ spillNumber = 0;
+ writer = null;
+ isActive = false;
+ }
+
+ void activate() throws IOException {
+ isActive = true;
+ writer = createSpillFile();
+ }
+
+ void createInDiskSegment() throws IOException {
+ assert (writer != null);
+ writer.close();
+ Segment<K,V> s = new Segment<K, V>(conf, fs, file, null, true);
+ writer = null;
+ segmentList.add(s);
+ LOG.debug("Disk Segment added to List. Size is " + segmentList.size());
+ }
+
+ boolean isActive() { return isActive; }
+
+ private Writer<K,V> createSpillFile() throws IOException {
+ Path tmp = new Path(
+ TaskTracker.getIntermediateOutputDir(
+ tid.getJobID().toString(), tid.toString()) +
+ "/backup_" + tid.getId() + "_" + (spillNumber++) + ".out");
+
+ LOG.info("Created file: " + tmp);
+
+ file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(),
+ -1, conf);
+ return new Writer<K, V>(conf, fs, file);
+ }
+ }
+
+ static class BackupRamManager implements RamManager {
+
+ private int availableSize = 0;
+ private final int maxSize;
+
+ public BackupRamManager(int size) {
+ availableSize = maxSize = size;
+ }
+
+ @Override
+ public boolean reserve(int requestedSize, InputStream in) {
+ // Not used
+ LOG.warn("Reserve(int, InputStream) not supported by BackupRamManager");
+ return false;
+ }
+
+ int reserve(int requestedSize) {
+ if (availableSize == 0) {
+ return 0;
+ }
+ int reservedSize = Math.min(requestedSize, availableSize);
+ availableSize -= reservedSize;
+ LOG.debug("Reserving: " + reservedSize + " Requested: " + requestedSize);
+ return reservedSize;
+ }
+
+ int reserve(int requestedSize, int minSize) {
+ if (availableSize < minSize) {
+ LOG.debug("No Space available. Available: " + availableSize +
+ " MinSize: " + minSize);
+ return 0;
+ } else {
+ return reserve(requestedSize);
+ }
+ }
+
+ @Override
+ public void unreserve(int requestedSize) {
+ availableSize += requestedSize;
+ LOG.debug("Unreserving: " + requestedSize +
+ ". Available: " + availableSize);
+ }
+
+ void reinitialize() {
+ availableSize = maxSize;
+ }
+ }
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Mon May 4 05:34:36 2009
@@ -19,6 +19,7 @@
import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
@@ -50,7 +51,7 @@
*/
class IFile {
- private static final int EOF_MARKER = -1;
+ static final int EOF_MARKER = -1;
/**
* <code>IFile.Writer</code> to write out intermediate map-outputs.
@@ -111,18 +112,31 @@
this.keyClass = keyClass;
this.valueClass = valueClass;
- SerializationFactory serializationFactory = new SerializationFactory(conf);
- this.keySerializer = serializationFactory.getSerializer(keyClass);
- this.keySerializer.open(buffer);
- this.valueSerializer = serializationFactory.getSerializer(valueClass);
- this.valueSerializer.open(buffer);
+
+ if (keyClass != null) {
+ SerializationFactory serializationFactory =
+ new SerializationFactory(conf);
+ this.keySerializer = serializationFactory.getSerializer(keyClass);
+ this.keySerializer.open(buffer);
+ this.valueSerializer = serializationFactory.getSerializer(valueClass);
+ this.valueSerializer.open(buffer);
+ }
+ }
+
+ public Writer(Configuration conf, FileSystem fs, Path file)
+ throws IOException {
+ this(conf, fs, file, null, null, null, null);
}
public void close() throws IOException {
- // Close the serializers
- keySerializer.close();
- valueSerializer.close();
+ // When IFile writer is created by BackupStore, we do not have
+ // Key and Value classes set. So, check before closing the
+ // serializers
+ if (keyClass != null) {
+ keySerializer.close();
+ valueSerializer.close();
+ }
// Write EOF_MARKER for key/value length
WritableUtils.writeVInt(out, EOF_MARKER);
@@ -226,6 +240,17 @@
++numRecordsWritten;
}
+ // Required for mark/reset
+ public DataOutputStream getOutputStream () {
+ return out;
+ }
+
+ // Required for mark/reset
+ public void updateCountersForExternalAppend(long length) {
+ ++numRecordsWritten;
+ decompressedBytesWritten += length;
+ }
+
public long getRawLength() {
return decompressedBytesWritten;
}
@@ -423,6 +448,15 @@
readRecordsCounter.increment(numRecordsRead);
}
}
+
+ public void reset(int offset) {
+ return;
+ }
+
+ public void disableChecksumValidation() {
+ checksumIn.disableChecksumValidation();
+ }
+
}
/**
@@ -432,6 +466,8 @@
RamManager ramManager;
TaskAttemptID taskAttemptId;
DataInputBuffer memDataIn = new DataInputBuffer();
+ private int start;
+ private int length;
public InMemoryReader(RamManager ramManager, TaskAttemptID taskAttemptId,
byte[] data, int start, int length)
throws IOException {
@@ -442,6 +478,15 @@
buffer = data;
bufferSize = (int)fileLength;
memDataIn.reset(buffer, start, length);
+ this.start = start;
+ this.length = length;
+ }
+
+ @Override
+ public void reset(int offset) {
+ memDataIn.reset(buffer, start + offset, length);
+ bytesRead = offset;
+ eof = false;
}
@Override
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFileInputStream.java Mon May 4 05:34:36 2009
@@ -41,6 +41,8 @@
private byte csum[] = null;
private int checksumSize;
+ private boolean disableChecksumValidation = false;
+
/**
* Create a checksum input stream that reads
* @param in The input stream to be verified for checksum.
@@ -155,6 +157,10 @@
sum.update(b,off,bytesRead);
currentOffset += bytesRead;
+
+ if (disableChecksumValidation) {
+ return bytesRead;
+ }
if (currentOffset == dataLength) {
// The last four bytes are checksum. Strip them and verify
@@ -183,4 +189,8 @@
public byte[] getChecksum() {
return csum;
}
+
+ void disableChecksumValidation() {
+ disableChecksumValidation = true;
+ }
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Mon May 4 05:34:36 2009
@@ -163,7 +163,7 @@
this.segmentLength = reader.getLength();
}
- private void init(Counters.Counter readsCounter) throws IOException {
+ void init(Counters.Counter readsCounter) throws IOException {
if (reader == null) {
FSDataInputStream in = fs.open(file);
in.seek(segmentOffset);
@@ -195,9 +195,15 @@
reader.nextRawValue(value);
}
+ void closeReader() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ }
+
void close() throws IOException {
- reader.close();
-
+ closeReader();
if (!preserve && fs != null) {
fs.delete(file, false);
}
@@ -206,6 +212,27 @@
public long getPosition() throws IOException {
return reader.getPosition();
}
+
+ // This method is used by BackupStore to extract the
+ // absolute position after a reset
+ long getActualPosition() throws IOException {
+ return segmentOffset + reader.getPosition();
+ }
+
+ Reader<K,V> getReader() {
+ return reader;
+ }
+
+ // This method is used by BackupStore to reinitialize the
+ // reader to start reading from a different segment offset
+ void reinitReader(int offset) throws IOException {
+ if (!inMemory()) {
+ closeReader();
+ segmentOffset = offset;
+ segmentLength = fs.getFileStatus(file).getLen() - segmentOffset;
+ init(null);
+ }
+ }
}
private static class MergeQueue<K extends Object, V extends Object>
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIterator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIterator.java?rev=771181&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIterator.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIterator.java Mon May 4 05:34:36 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * <code>MarkableIterator</code> is a wrapper iterator class that
+ * implements the {@link MarkableIteratorInterface}.
+ *
+ */
+public class MarkableIterator<VALUE>
+ implements MarkableIteratorInterface<VALUE> {
+
+ MarkableIteratorInterface<VALUE> baseIterator;
+
+ /**
+ * Create a new iterator layered on the input iterator
+ * @param itr underlying iterator that implements MarkableIteratorInterface
+ */
+ public MarkableIterator(Iterator<VALUE> itr) {
+ if (!(itr instanceof MarkableIteratorInterface)) {
+ throw new IllegalArgumentException("Input Iterator not markable");
+ }
+ baseIterator = (MarkableIteratorInterface<VALUE>) itr;
+ }
+
+ @Override
+ public void mark() throws IOException {
+ baseIterator.mark();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ baseIterator.reset();
+ }
+
+ @Override
+ public void clearMark() throws IOException {
+ baseIterator.clearMark();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return baseIterator.hasNext();
+ }
+
+ @Override
+ public VALUE next() {
+ return baseIterator.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove Not Implemented");
+ }
+}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIteratorInterface.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIteratorInterface.java?rev=771181&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIteratorInterface.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MarkableIteratorInterface.java Mon May 4 05:34:36 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * <code>MarkableIteratorInterface</code> is an interface for a iterator that
+ * supports mark-reset functionality.
+ *
+ * <p>Mark can be called at any point during the iteration process and a reset
+ * will go back to the last record before the call to the previous mark.
+ *
+ */
+interface MarkableIteratorInterface<VALUE> extends Iterator<VALUE> {
+ /**
+ * Mark the current record. A subsequent call to reset will rewind
+ * the iterator to this record.
+ * @throws IOException
+ */
+ void mark() throws IOException;
+
+ /**
+ * Reset the iterator to the last record before a call to the previous mark
+ * @throws IOException
+ */
+ void reset() throws IOException;
+
+ /**
+ * Clear any previously set mark
+ * @throws IOException
+ */
+ void clearMark() throws IOException;
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java Mon May 4 05:34:36 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -26,8 +27,11 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.BackupStore;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.util.Progressable;
@@ -54,7 +58,16 @@
private DataInputBuffer buffer = new DataInputBuffer();
private BytesWritable currentRawKey = new BytesWritable();
private ValueIterable iterable = new ValueIterable();
-
+ private boolean isMarked = false;
+ private BackupStore<KEYIN,VALUEIN> backupStore;
+ private final SerializationFactory serializationFactory;
+ private final Class<KEYIN> keyClass;
+ private final Class<VALUEIN> valueClass;
+ private final Configuration conf;
+ private final TaskAttemptID taskid;
+ private int currentKeyLength = -1;
+ private int currentValueLength = -1;
+
public ReduceContext(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
Counter inputCounter,
@@ -69,12 +82,16 @@
this.input = input;
this.inputCounter = inputCounter;
this.comparator = comparator;
- SerializationFactory serializationFactory = new SerializationFactory(conf);
+ this.serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(buffer);
this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
this.valueDeserializer.open(buffer);
hasMore = input.next();
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ this.conf = conf;
+ this.taskid = taskid;
}
/** Start processing next unique key. */
@@ -100,23 +117,31 @@
return false;
}
firstValue = !nextKeyIsSame;
- DataInputBuffer next = input.getKey();
- currentRawKey.set(next.getData(), next.getPosition(),
- next.getLength() - next.getPosition());
+ DataInputBuffer nextKey = input.getKey();
+ currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
+ nextKey.getLength() - nextKey.getPosition());
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
key = keyDeserializer.deserialize(key);
- next = input.getValue();
- buffer.reset(next.getData(), next.getPosition(), next.getLength());
+ DataInputBuffer nextVal = input.getValue();
+ buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
value = valueDeserializer.deserialize(value);
+
+ currentKeyLength = nextKey.getLength() - nextKey.getPosition();
+ currentValueLength = nextVal.getLength() - nextVal.getPosition();
+
+ if (isMarked) {
+ backupStore.write(nextKey, nextVal);
+ }
+
hasMore = input.next();
inputCounter.increment(1);
if (hasMore) {
- next = input.getKey();
+ nextKey = input.getKey();
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
- currentRawKey.getLength(),
- next.getData(),
- next.getPosition(),
- next.getLength() - next.getPosition()
+ currentRawKey.getLength(),
+ nextKey.getData(),
+ nextKey.getPosition(),
+ nextKey.getLength() - nextKey.getPosition()
) == 0;
} else {
nextKeyIsSame = false;
@@ -132,16 +157,51 @@
public VALUEIN getCurrentValue() {
return value;
}
+
- protected class ValueIterator implements Iterator<VALUEIN> {
+
+ protected class ValueIterator implements MarkableIteratorInterface<VALUEIN> {
+ private boolean inReset = false;
+ private boolean clearMarkFlag = false;
+
@Override
public boolean hasNext() {
+ try {
+ if (inReset && backupStore.hasNext()) {
+ return true;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("hasNext failed", e);
+ }
return firstValue || nextKeyIsSame;
}
@Override
public VALUEIN next() {
+ if (inReset) {
+ try {
+ if (backupStore.hasNext()) {
+ backupStore.next();
+ DataInputBuffer next = backupStore.nextValue();
+ buffer.reset(next.getData(), next.getPosition(), next.getLength());
+ value = valueDeserializer.deserialize(value);
+ return value;
+ } else {
+ inReset = false;
+ backupStore.exitResetMode();
+ if (clearMarkFlag) {
+ clearMarkFlag = false;
+ isMarked = false;
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException("next value iterator failed", e);
+ }
+ }
+
// if this is the first record, we don't need to advance
if (firstValue) {
firstValue = false;
@@ -168,7 +228,101 @@
public void remove() {
throw new UnsupportedOperationException("remove not implemented");
}
-
+
+ @Override
+ public void mark() throws IOException {
+ if (backupStore == null) {
+ backupStore = new BackupStore<KEYIN,VALUEIN>(conf, taskid);
+ }
+ isMarked = true;
+ if (!inReset) {
+ backupStore.reinitialize();
+ if (currentKeyLength == -1) {
+ // The user has not called next() for this iterator yet, so
+ // there is no current record to mark and copy to backup store.
+ return;
+ }
+ assert (currentValueLength != -1);
+ int requestedSize = currentKeyLength + currentValueLength +
+ WritableUtils.getVIntSize(currentKeyLength) +
+ WritableUtils.getVIntSize(currentValueLength);
+ DataOutputStream out = backupStore.getOutputStream(requestedSize);
+ writeFirstKeyValueBytes(out);
+ backupStore.updateCounters(requestedSize);
+ } else {
+ backupStore.mark();
+ }
+ }
+
+ @Override
+ public void reset() throws IOException {
+ // We reached the end of an iteration and user calls a
+ // reset, but a clearMark was called before, just throw
+ // an exception
+ if (clearMarkFlag) {
+ clearMarkFlag = false;
+ backupStore.clearMark();
+ throw new IOException("Reset called without a previous mark");
+ }
+
+ if (!isMarked) {
+ throw new IOException("Reset called without a previous mark");
+ }
+ inReset = true;
+ backupStore.reset();
+ }
+
+ @Override
+ public void clearMark() throws IOException {
+ if (backupStore == null) {
+ return;
+ }
+ if (inReset) {
+ clearMarkFlag = true;
+ backupStore.clearMark();
+ } else {
+ inReset = isMarked = false;
+ backupStore.reinitialize();
+ }
+ }
+
+ /**
+ * This method is called when the reducer moves from one key to
+ * another.
+ * @throws IOException
+ */
+ void resetBackupStore() throws IOException {
+ if (backupStore == null) {
+ return;
+ }
+ inReset = isMarked = false;
+ backupStore.reinitialize();
+ currentKeyLength = -1;
+ }
+
+ /**
+ * This method is called to write the record that was most recently
+ * served (before a call to the mark). Since the framework reads one
+ * record in advance, to get this record, we serialize the current key
+ * and value
+ * @param out
+ * @throws IOException
+ */
+ private void writeFirstKeyValueBytes(DataOutputStream out)
+ throws IOException {
+ assert (getCurrentKey() != null && getCurrentValue() != null);
+ WritableUtils.writeVInt(out, currentKeyLength);
+ WritableUtils.writeVInt(out, currentValueLength);
+ Serializer<KEYIN> keySerializer =
+ serializationFactory.getSerializer(keyClass);
+ keySerializer.open(out);
+ keySerializer.serialize(getCurrentKey());
+
+ Serializer<VALUEIN> valueSerializer =
+ serializationFactory.getSerializer(valueClass);
+ valueSerializer.open(out);
+ valueSerializer.serialize(getCurrentValue());
+ }
}
protected class ValueIterable implements Iterable<VALUEIN> {
@@ -189,4 +343,4 @@
Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
return iterable;
}
-}
\ No newline at end of file
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java?rev=771181&r1=771180&r2=771181&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java Mon May 4 05:34:36 2009
@@ -168,11 +168,15 @@
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
+ @SuppressWarnings("unchecked")
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
+ // If a back up store is used, reset it
+ ((ReduceContext.ValueIterator)
+ (context.getValues().iterator())).resetBackupStore();
}
cleanup(context);
}
-}
\ No newline at end of file
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestValueIterReset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestValueIterReset.java?rev=771181&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestValueIterReset.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapreduce/TestValueIterReset.java Mon May 4 05:34:36 2009
@@ -0,0 +1,571 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputLogFilter;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+/**
+ * A JUnit test to test the Map-Reduce framework's support for the
+ * "mark-reset" functionality in Reduce Values Iterator
+ */
+public class TestValueIterReset extends TestCase {
+ private static final int NUM_MAPS = 1;
+ private static final int NUM_TESTS = 4;
+ private static final int NUM_VALUES = 40;
+
+ private static Path TEST_ROOT_DIR =
+ new Path(System.getProperty("test.build.data","/tmp"));
+ private static Configuration conf = new Configuration();
+ private static FileSystem localFs;
+ static {
+ try {
+ localFs = FileSystem.getLocal(conf);
+ } catch (IOException io) {
+ throw new RuntimeException("problem getting local fs", io);
+ }
+ }
+
+ private static final Log LOG =
+ LogFactory.getLog(TestValueIterReset.class);
+
+ public static class TestMapper
+ extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
+
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+
+ IntWritable outKey = new IntWritable();
+ IntWritable outValue = new IntWritable();
+
+ for (int j = 0; j < NUM_TESTS; j++) {
+ for (int i = 0; i < NUM_VALUES; i++) {
+ outKey.set(j);
+ outValue.set(i);
+ context.write(outKey, outValue);
+ }
+ }
+ }
+ }
+
+ public static class TestReducer
+ extends Reducer< IntWritable,IntWritable,IntWritable,IntWritable> {
+
+ public void reduce(IntWritable key, Iterable<IntWritable> values,
+ Context context) throws IOException, InterruptedException {
+
+ int errors = 0;
+
+ MarkableIterator<IntWritable> mitr =
+ new MarkableIterator<IntWritable>(values.iterator());
+
+ switch (key.get()) {
+ case 0:
+ errors += test0(key, mitr);
+ break;
+ case 1:
+ errors += test1(key, mitr);
+ break;
+ case 2:
+ errors += test2(key, mitr);
+ break;
+ case 3:
+ errors += test3(key, mitr);
+ break;
+ default:
+ break;
+ }
+ context.write(key, new IntWritable(errors));
+ }
+ }
+
+ /**
+ * Test the most common use case. Mark before start of the iteration and
+ * reset at the end to go over the entire list
+ * @param key
+ * @param values
+ * @return
+ * @throws IOException
+ */
+
+ private static int test0(IntWritable key,
+ MarkableIterator<IntWritable> values)
+ throws IOException {
+
+ int errors = 0;
+ IntWritable i;
+ ArrayList<IntWritable> expectedValues = new ArrayList<IntWritable>();
+
+
+ LOG.info("Executing TEST:0 for Key:"+ key.toString());
+
+ values.mark();
+ LOG.info("TEST:0. Marking");
+
+ while (values.hasNext()) {
+ i = values.next();
+ expectedValues.add(i);
+ LOG.info(key + ":" + i);
+ }
+
+ values.reset();
+ LOG.info("TEST:0. Reset");
+
+ int count = 0;
+
+ while (values.hasNext()) {
+ i = values.next();
+ LOG.info(key + ":" + i);
+ if (i != expectedValues.get(count)) {
+ LOG.info("TEST:0. Check:1 Expected: " + expectedValues.get(count) +
+ ", Got: " + i);
+ errors ++;
+ return errors;
+ }
+ count ++;
+ }
+
+ LOG.info("TEST:0 Done");
+ return errors;
+ }
+
+ /**
+ * Test the case where we do a mark outside of a reset. Test for both file
+ * and memory caches
+ * @param key
+ * @param values
+ * @return
+ * @throws IOException
+ */
+ private static int test1(IntWritable key,
+ MarkableIterator<IntWritable> values)
+ throws IOException {
+
+ IntWritable i;
+ int errors = 0;
+ int count = 0;
+
+ ArrayList<IntWritable> expectedValues = new ArrayList<IntWritable>();
+ ArrayList<IntWritable> expectedValues1 = new ArrayList<IntWritable>();
+
+ LOG.info("Executing TEST:1 for Key:" + key);
+
+ values.mark();
+ LOG.info("TEST:1. Marking");
+
+ while (values.hasNext()) {
+ i = values.next();
+ LOG.info(key + ":" + i);
+ expectedValues.add(i);
+ if (count == 2) {
+ break;
+ }
+ count ++;
+ }
+
+ values.reset();
+ LOG.info("TEST:1. Reset");
+ count = 0;
+
+ while (values.hasNext()) {
+ i = values.next();
+ LOG.info(key + ":" + i);
+
+ if (count < expectedValues.size()) {
+ if (i != expectedValues.get(count)) {
+ errors ++;
+ LOG.info("TEST:1. Check:1 Expected: " + expectedValues.get(count) +
+ ", Got: " + i);
+ return errors;
+ }
+ }
+
+ // We have moved passed the first mark, but still in the memory cache
+ if (count == 3) {
+ values.mark();
+ LOG.info("TEST:1. Marking -- " + key + ": " + i);
+ }
+
+ if (count >= 3) {
+ expectedValues1.add(i);
+ }
+
+ if (count == 5) {
+ break;
+ }
+ count ++;
+ }
+
+ if (count < expectedValues.size()) {
+ LOG.info(("TEST:1 Check:2. Iterator returned lesser values"));
+ errors ++;
+ return errors;
+ }
+
+ values.reset();
+ count = 0;
+ LOG.info("TEST:1. Reset");
+ expectedValues.clear();
+
+ while (values.hasNext()) {
+ i = values.next();
+ LOG.info(key + ":" + i);
+
+ if (count < expectedValues1.size()) {
+ if (i != expectedValues1.get(count)) {
+ errors ++;
+ LOG.info("TEST:1. Check:3 Expected: " + expectedValues1.get(count)
+ + ", Got: " + i);
+ return errors;
+ }
+ }
+
+ // We have moved passed the previous mark, but now we are in the file
+ // cache
+ if (count == 25) {
+ values.mark();
+ LOG.info("TEST:1. Marking -- " + key + ":" + i);
+ }
+
+ if (count >= 25) {
+ expectedValues.add(i);
+ }
+ count ++;
+ }
+
+ if (count < expectedValues1.size()) {
+ LOG.info(("TEST:1 Check:4. Iterator returned fewer values"));
+ errors ++;
+ return errors;
+ }
+
+ values.reset();
+ LOG.info("TEST:1. Reset");
+ count = 0;
+
+ while (values.hasNext()) {
+ i = values.next();
+ LOG.info(key + ":" + i);
+
+ if (i != expectedValues.get(count)) {
+ errors ++;
+ LOG.info("TEST:1. Check:5 Expected: " + expectedValues.get(count)
+ + ", Got: " + i);
+ return errors;
+ }
+ }
+
+ LOG.info("TEST:1 Done");
+ return errors;
+ }
+
+ /**
+ * Test the case where we do a mark inside a reset. Test for both file
+ * and memory
+ * @param key
+ * @param values
+ * @return
+ * @throws IOException
+ */
+ private static int test2(IntWritable key,
+ MarkableIterator<IntWritable> values)
+ throws IOException {
+
+ IntWritable i;
+ int errors = 0;
+ int count = 0;
+
+ ArrayList<IntWritable> expectedValues = new ArrayList<IntWritable>();
+ ArrayList<IntWritable> expectedValues1 = new ArrayList<IntWritable>();
+
+ LOG.info("Executing TEST:2 for Key:" + key);
+
+ values.mark();
+ LOG.info("TEST:2 Marking");
+
+ while (values.hasNext()) {
+ i = values.next();
+ LOG.info(key + ":" + i);
+ expectedValues.add(i);
+ if (count == 8) {
+ break;
+ }
+ count ++;
+ }
+
+ values.reset();
+ count = 0;
+ LOG.info("TEST:2 reset");
+
+ while (values.hasNext()) {
+ i = values.next();
+ LOG.info(key + ":" + i);
+
+ if (count < expectedValues.size()) {
+ if (i != expectedValues.get(count)) {
+ errors ++;
+ LOG.info("TEST:2. Check:1 Expected: " + expectedValues.get(count)
+ + ", Got: " + i);
+ return errors;
+ }
+ }
+
+ // We have moved passed the first mark, but still reading from the
+ // memory cache
+ if (count == 3) {
+ values.mark();
+ LOG.info("TEST:2. Marking -- " + key + ":" + i);
+ }
+
+ if (count >= 3) {
+ expectedValues1.add(i);
+ }
+ count ++;
+ }
+
+ values.reset();
+ LOG.info("TEST:2. Reset");
+ expectedValues.clear();
+ count = 0;
+
+ while (values.hasNext()) {
+ i = values.next();
+ LOG.info(key + ":" + i);
+
+ if (count < expectedValues1.size()) {
+ if (i != expectedValues1.get(count)) {
+ errors ++;
+ LOG.info("TEST:2. Check:2 Expected: " + expectedValues1.get(count)
+ + ", Got: " + i);
+ return errors;
+ }
+ }
+
+ // We have moved passed the previous mark, but now we are in the file
+ // cache
+ if (count == 20) {
+ values.mark();
+ LOG.info("TEST:2. Marking -- " + key + ":" + i);
+ }
+
+ if (count >= 20) {
+ expectedValues.add(i);
+ }
+ count ++;
+ }
+
+ values.reset();
+ count = 0;
+ LOG.info("TEST:2. Reset");
+
+ while (values.hasNext()) {
+ i = values.next();
+ LOG.info(key + ":" + i);
+
+ if (i != expectedValues.get(count)) {
+ errors ++;
+ LOG.info("TEST:2. Check:1 Expected: " + expectedValues.get(count)
+ + ", Got: " + i);
+ return errors;
+ }
+ }
+
+ LOG.info("TEST:2 Done");
+ return errors;
+ }
+
+ /**
+ * Test "clearMark"
+ * @param key
+ * @param values
+ * @return
+ * @throws IOException
+ */
+ private static int test3(IntWritable key,
+ MarkableIterator<IntWritable> values)
+ throws IOException {
+
+ int errors = 0;
+ IntWritable i;
+
+ ArrayList<IntWritable> expectedValues = new ArrayList<IntWritable>();
+
+ LOG.info("Executing TEST:3 for Key:" + key);
+
+ values.mark();
+ LOG.info("TEST:3. Marking");
+ int count = 0;
+
+ while (values.hasNext()) {
+ i = values.next();;
+ LOG.info(key + ":" + i);
+
+ if (count == 5) {
+ LOG.info("TEST:3. Clearing Mark");
+ values.clearMark();
+ }
+
+ if (count == 8) {
+ LOG.info("TEST:3. Marking -- " + key + ":" + i);
+ values.mark();
+ }
+
+ if (count >= 8) {
+ expectedValues.add(i);
+ }
+ count ++;
+ }
+
+ values.reset();
+ LOG.info("TEST:3. After reset");
+
+ if (!values.hasNext()) {
+ errors ++;
+ LOG.info("TEST:3, Check:1. HasNext returned false");
+ return errors;
+ }
+
+ count = 0;
+
+ while (values.hasNext()) {
+ i = values.next();
+ LOG.info(key + ":" + i);
+
+ if (count < expectedValues.size()) {
+ if (i != expectedValues.get(count)) {
+ errors ++;
+ LOG.info("TEST:2. Check:1 Expected: " + expectedValues.get(count)
+ + ", Got: " + i);
+ return errors;
+ }
+ }
+
+ if (count == 10) {
+ values.clearMark();
+ LOG.info("TEST:3. After clear mark");
+ }
+ count ++;
+ }
+
+ boolean successfulClearMark = false;
+ try {
+ LOG.info("TEST:3. Before Reset");
+ values.reset();
+ } catch (IOException e) {
+ successfulClearMark = true;
+ }
+
+ if (!successfulClearMark) {
+ LOG.info("TEST:3 Check:4 reset was successfule even after clearMark");
+ errors ++;
+ return errors;
+ }
+
+ LOG.info("TEST:3 Done.");
+ return errors;
+ }
+
+
+ public void createInput() throws Exception {
+ // Just create one line files. We use this only to
+ // control the number of map tasks
+ for (int i = 0; i < NUM_MAPS; i++) {
+ Path file = new Path(TEST_ROOT_DIR+"/in", "test" + i + ".txt");
+ localFs.delete(file, false);
+ OutputStream os = localFs.create(file);
+ Writer wr = new OutputStreamWriter(os);
+ wr.write("dummy");
+ wr.close();
+ }
+ }
+
+ public void testValueIterReset() {
+ try {
+ Configuration conf = new Configuration();
+ Job job = new Job(conf, "TestValueIterReset") ;
+ job.setJarByClass(TestValueIterReset.class);
+ job.setMapperClass(TestMapper.class);
+ job.setReducerClass(TestReducer.class);
+ job.setNumReduceTasks(NUM_TESTS);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(IntWritable.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(IntWritable.class);
+ job.getConfiguration().setInt("mapred.job.reduce.markreset.buffer.size",
+ 128);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ FileInputFormat.addInputPath(job,
+ new Path(TEST_ROOT_DIR + "/in"));
+ Path output = new Path(TEST_ROOT_DIR + "/out");
+ localFs.delete(output, true);
+ FileOutputFormat.setOutputPath(job, output);
+ createInput();
+ assertTrue(job.waitForCompletion(true));
+ validateOutput();
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertTrue(false);
+ }
+ }
+
+ private void validateOutput() throws IOException {
+ Path[] outputFiles = FileUtil.stat2Paths(
+ localFs.listStatus(new Path(TEST_ROOT_DIR + "/out"),
+ new OutputLogFilter()));
+ if (outputFiles.length > 0) {
+ InputStream is = localFs.open(outputFiles[0]);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+ String line = reader.readLine();
+ while (line != null) {
+ StringTokenizer tokeniz = new StringTokenizer(line, "\t");
+ String key = tokeniz.nextToken();
+ String value = tokeniz.nextToken();
+ LOG.info("Output: key: "+ key + " value: "+ value);
+ int errors = Integer.parseInt(value);
+ assertTrue(errors == 0);
+ line = reader.readLine();
+ }
+ reader.close();
+ }
+ }
+}