You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/28 23:05:40 UTC
svn commit: r417874 - in /lucene/hadoop/trunk: ./
src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/dfs/
src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/...
Author: cutting
Date: Wed Jun 28 14:05:38 2006
New Revision: 417874
URL: http://svn.apache.org/viewvc?rev=417874&view=rev
Log:
HADOOP-318. Keep slow DFS output from causing task timeouts. Contributed by Milind.
Added:
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 28 14:05:38 2006
@@ -80,6 +80,12 @@
19. HADOOP-27. Don't allocate tasks to trackers whose local free
space is too low. (Johan Oskarson via cutting)
+20. HADOOP-318. Keep slow DFS output from causing task timeouts.
+ This incompatibly changes some public interfaces, adding a
+ parameter to OutputFormat.getRecordWriter() and the new method
+ Reporter.progress(), but it makes lots of tasks succeed that were
+ previously failing. (Milind Bhandarkar via cutting)
+
Release 0.3.2 - 2006-06-09
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Wed Jun 28 14:05:38 2006
@@ -36,6 +36,7 @@
pgd.addClass("randomwriter", RandomWriter.class,
"A random writer benchmark that writes 10GB per node.");
pgd.addClass("sort", Sort.class, "A sort benchmark that sorts the data written by the random writer.");
+ pgd.addClass("pi", PiBenchmark.class, "A benchmark that estimates Pi using monte-carlo method.");
pgd.driver(argv);
}
catch(Throwable e){
Added: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java?rev=417874&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java (added)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java Wed Jun 28 14:05:38 2006
@@ -0,0 +1,225 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.*;
+
+/**
+ * A Map-reduce program to estimaate the valu eof Pi using monte-carlo
+ * method.
+ *
+ * @author Milind Bhandarkar
+ */
+public class PiBenchmark {
+
+ /**
+ * Mappper class for Pi estimation.
+ */
+
+ public static class PiMapper extends MapReduceBase implements Mapper {
+
+ /** Mapper configuration.
+ *
+ */
+ public void configure(JobConf job) {
+ }
+
+ static Random r = new Random();
+
+ long numInside = 0L;
+ long numOutside = 0L;
+
+ /** Map method.
+ * @param key
+ * @param value not-used.
+ * @param out
+ * @param reporter
+ */
+ public void map(WritableComparable key,
+ Writable val,
+ OutputCollector out,
+ Reporter reporter) throws IOException {
+ long nSamples = ((LongWritable) key).get();
+ for(long idx = 0; idx < nSamples; idx++) {
+ double x = r.nextDouble();
+ double y = r.nextDouble();
+ double d = (x-0.5)*(x-0.5)+(y-0.5)*(y-0.5);
+ if (d > 0.25) {
+ numOutside++;
+ } else {
+ numInside++;
+ }
+ if (idx%1000 == 1) {
+ reporter.setStatus("Generated "+idx+" samples.");
+ }
+ }
+ out.collect(new LongWritable(0), new LongWritable(numOutside));
+ out.collect(new LongWritable(1), new LongWritable(numInside));
+ }
+
+ public void close() {
+ // nothing
+ }
+ }
+
+ public static class PiReducer extends MapReduceBase implements Reducer {
+ long numInside = 0;
+ long numOutside = 0;
+ JobConf conf;
+
+ /** Reducer configuration.
+ *
+ */
+ public void configure(JobConf job) {
+ conf = job;
+ }
+ /** Reduce method.
+ * @ param key
+ * @param values
+ * @param output
+ * @param reporter
+ */
+ public void reduce(WritableComparable key,
+ Iterator values,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ if (((LongWritable)key).get() == 1) {
+ while (values.hasNext()) {
+ long num = ((LongWritable)values.next()).get();
+ numInside += num;
+ }
+ } else {
+ while (values.hasNext()) {
+ long num = ((LongWritable)values.next()).get();
+ numOutside += num;
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ Path tmpDir = new Path("test-mini-mr");
+ Path outDir = new Path(tmpDir, "out");
+ Path outFile = new Path(outDir, "reduce-out");
+ FileSystem fileSys = FileSystem.get(conf);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, outFile,
+ LongWritable.class, LongWritable.class);
+ writer.append(new LongWritable(numInside), new LongWritable(numOutside));
+ writer.close();
+ }
+ }
+
+ /**
+ * This is the main driver for computing the value of Pi using
+ * monte-carlo method.
+ */
+ static double launch(int numMaps, long numPoints, String jt, String dfs)
+ throws IOException {
+
+ Configuration conf = new Configuration();
+ JobConf jobConf = new JobConf(conf, PiBenchmark.class);
+ if (jt != null) { jobConf.set("mapred.job.tracker", jt); }
+ if (dfs != null) { jobConf.set("fs.default.name", dfs); }
+ jobConf.setJobName("test-mini-mr");
+
+ // turn off speculative execution, because DFS doesn't handle
+ // multiple writers to the same file.
+ jobConf.setSpeculativeExecution(false);
+ jobConf.setInputKeyClass(LongWritable.class);
+ jobConf.setInputValueClass(LongWritable.class);
+ jobConf.setInputFormat(SequenceFileInputFormat.class);
+
+ jobConf.setOutputKeyClass(LongWritable.class);
+ jobConf.setOutputValueClass(LongWritable.class);
+ jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ jobConf.setMapperClass(PiMapper.class);
+ jobConf.setReducerClass(PiReducer.class);
+
+ jobConf.setNumReduceTasks(1);
+
+ Path tmpDir = new Path("test-mini-mr");
+ Path inDir = new Path(tmpDir, "in");
+ Path outDir = new Path(tmpDir, "out");
+ FileSystem fileSys = FileSystem.get(jobConf);
+ fileSys.delete(tmpDir);
+ fileSys.mkdirs(inDir);
+
+ jobConf.setInputPath(inDir);
+ jobConf.setOutputPath(outDir);
+
+ jobConf.setNumMapTasks(numMaps);
+
+ for(int idx=0; idx < numMaps; ++idx) {
+ Path file = new Path(inDir, "part"+idx);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file,
+ LongWritable.class, LongWritable.class);
+ writer.append(new LongWritable(numPoints), new LongWritable(0));
+ writer.close();
+ System.out.println("Wrote input for Map #"+idx);
+ }
+
+ double estimate = 0.0;
+
+ try {
+ System.out.println("Starting Job");
+ long startTime = System.currentTimeMillis();
+ JobClient.runJob(jobConf);
+ System.out.println("Job Finished in "+
+ (double)(System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+ Path inFile = new Path(outDir, "reduce-out");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile,
+ jobConf);
+ LongWritable numInside = new LongWritable();
+ LongWritable numOutside = new LongWritable();
+ reader.next(numInside, numOutside);
+ reader.close();
+ estimate = (double) (numInside.get()*4.0)/(numMaps*numPoints);
+ } finally {
+ fileSys.delete(tmpDir);
+ }
+
+ return estimate;
+ }
+
+ /**
+ * Launches all the tasks in order.
+ */
+ public static void main(String[] argv) throws Exception {
+ if (argv.length < 2) {
+ System.err.println("Usage: TestMiniMR <nMaps> <nSamples>");
+ return;
+ }
+
+ int nMaps = Integer.parseInt(argv[0]);
+ long nSamples = Long.parseLong(argv[1]);
+
+ System.out.println("Number of Maps = "+nMaps+" Samples per Map = "+nSamples);
+
+ System.out.println("Estimated value of PI is "+
+ launch(nMaps, nSamples, null, null));
+ }
+}
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Wed Jun 28 14:05:38 2006
@@ -76,7 +76,8 @@
String filename = ((UTF8) value).toString();
SequenceFile.Writer writer =
new SequenceFile.Writer(fileSys, new Path(filename),
- BytesWritable.class, BytesWritable.class);
+ BytesWritable.class, BytesWritable.class,
+ reporter);
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize +
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Jun 28 14:05:38 2006
@@ -216,7 +216,23 @@
public FSOutputStream create( UTF8 src,
boolean overwrite
) throws IOException {
- return create( src, overwrite, defaultReplication, defaultBlockSize);
+ return create( src, overwrite, defaultReplication, defaultBlockSize, null);
+ }
+
+ /**
+ * Create a new dfs file and return an output stream for writing into it
+ * with write-progress reporting.
+ *
+ * @param src stream name
+ * @param overwrite do not check for file existence if true
+ * @return output stream
+ * @throws IOException
+ */
+ public FSOutputStream create( UTF8 src,
+ boolean overwrite,
+ Progressable progress
+ ) throws IOException {
+ return create( src, overwrite, defaultReplication, defaultBlockSize, null);
}
/**
@@ -234,15 +250,34 @@
short replication,
long blockSize
) throws IOException {
+ return create(src, overwrite, replication, blockSize, null);
+ }
+
+ /**
+ * Create a new dfs file with the specified block replication
+ * with write-progress reporting and return an output stream for writing
+ * into the file.
+ *
+ * @param src stream name
+ * @param overwrite do not check for file existence if true
+ * @param replication block replication
+ * @return output stream
+ * @throws IOException
+ */
+ public FSOutputStream create( UTF8 src,
+ boolean overwrite,
+ short replication,
+ long blockSize,
+ Progressable progress
+ ) throws IOException {
checkOpen();
FSOutputStream result = new DFSOutputStream(src, overwrite,
- replication, blockSize);
+ replication, blockSize, progress);
synchronized (pendingCreates) {
pendingCreates.put(src.toString(), result);
}
return result;
}
-
/**
* Set replication for an existing file.
*
@@ -718,11 +753,13 @@
private String datanodeName;
private long blockSize;
+ private Progressable progress;
/**
* Create a new output stream to the given DataNode.
*/
public DFSOutputStream(UTF8 src, boolean overwrite,
- short replication, long blockSize
+ short replication, long blockSize,
+ Progressable progress
) throws IOException {
this.src = src;
this.overwrite = overwrite;
@@ -730,6 +767,10 @@
this.backupFile = newBackupFile();
this.blockSize = blockSize;
this.backupStream = new FileOutputStream(backupFile);
+ this.progress = progress;
+ if (progress != null) {
+ LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
+ }
}
private File newBackupFile() throws IOException {
@@ -980,6 +1021,7 @@
while (bytesRead > 0) {
blockStream.writeLong((long) bytesRead);
blockStream.write(buf, 0, bytesRead);
+ if (progress != null) { progress.progress(); }
bytesRead = in.read(buf);
}
internalClose();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Jun 28 14:05:38 2006
@@ -212,6 +212,17 @@
shutdown();
}
+ private static class Count {
+ int value = 0;
+ Count(int init) { value = init; }
+ synchronized void incr() { value++; }
+ synchronized void decr() { value--; }
+ public String toString() { return Integer.toString(value); }
+ public int getValue() { return value; }
+ }
+
+ Count xceiverCount = new Count(0);
+
/**
* Main loop for the DataNode. Runs until shutdown,
* forever calling remote NameNode functions.
@@ -243,7 +254,8 @@
BlockCommand cmd = namenode.sendHeartbeat(dnRegistration,
data.getCapacity(),
data.getRemaining(),
- xmitsInProgress);
+ xmitsInProgress,
+ xceiverCount.getValue());
//LOG.info("Just sent heartbeat, with name " + localName);
lastHeartbeat = now;
@@ -345,6 +357,8 @@
}
} // offerService
+
+
/**
* Server used for receiving/sending a block of data.
* This is created to listen for requests from clients or
@@ -366,6 +380,7 @@
Socket s = ss.accept();
//s.setSoTimeout(READ_TIMEOUT);
data.checkDataDir();
+ xceiverCount.incr();
new Daemon(new DataXceiver(s)).start();
}
ss.close();
@@ -393,6 +408,7 @@
Socket s;
public DataXceiver(Socket s) {
this.s = s;
+ LOG.debug("Number of active connections is: "+xceiverCount);
}
/**
@@ -421,6 +437,8 @@
LOG.warn("DataXCeiver", ie);
} finally {
try {
+ xceiverCount.decr();
+ LOG.debug("Number of active connections is: "+xceiverCount);
s.close();
} catch (IOException ie2) {
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java Wed Jun 28 14:05:38 2006
@@ -28,6 +28,7 @@
* @author Mike Cafarella
**************************************************/
public class DatanodeInfo extends DatanodeID implements Writable, Comparable {
+ private int xceiverCount;
static { // register a ctor
WritableFactories.setFactory
@@ -36,18 +37,18 @@
public Writable newInstance() { return new DatanodeInfo(); }
});
}
-
+ /** number of active connections */
+ public int getXceiverCount() { return xceiverCount; }
+
private long capacityBytes, remainingBytes, lastUpdate;
private volatile TreeSet blocks;
-
/** Create an empty DatanodeInfo.
*/
public DatanodeInfo() {
- this(new String(), new String(), 0, 0);
+ this(new String(), new String(), 0, 0, 0);
}
-
public DatanodeInfo( DatanodeID nodeID ) {
- this( nodeID.getName(), nodeID.getStorageID(), 0, 0);
+ this( nodeID.getName(), nodeID.getStorageID(), 0, 0, 0);
}
/**
@@ -55,22 +56,22 @@
*/
public DatanodeInfo(DatanodeID nodeID,
long capacity,
- long remaining) {
- this( nodeID.getName(), nodeID.getStorageID(), capacity, remaining );
+ long remaining,
+ int xceiverCount) {
+ this( nodeID.getName(), nodeID.getStorageID(), capacity, remaining, xceiverCount );
}
-
/**
* @param name hostname:portNumber as String object.
*/
public DatanodeInfo(String name,
String storageID,
long capacity,
- long remaining) {
+ long remaining,
+ int xceiverCount) {
super( name, storageID );
this.blocks = new TreeSet();
- updateHeartbeat(capacity, remaining);
+ updateHeartbeat(capacity, remaining, xceiverCount);
}
-
/**
*/
public void updateBlocks(Block newBlocks[]) {
@@ -88,9 +89,10 @@
/**
*/
- public void updateHeartbeat(long capacity, long remaining) {
+ public void updateHeartbeat(long capacity, long remaining, int xceiverCount) {
this.capacityBytes = capacity;
this.remainingBytes = remaining;
+ this.xceiverCount = xceiverCount;
this.lastUpdate = System.currentTimeMillis();
}
@@ -119,7 +121,6 @@
DatanodeInfo d = (DatanodeInfo) o;
return name.compareTo(d.getName());
}
-
/////////////////////////////////////////////////
// Writable
/////////////////////////////////////////////////
@@ -131,6 +132,7 @@
out.writeLong(capacityBytes);
out.writeLong(remainingBytes);
out.writeLong(lastUpdate);
+ out.writeInt(xceiverCount);
/**
out.writeInt(blocks.length);
@@ -151,7 +153,7 @@
this.capacityBytes = in.readLong();
this.remainingBytes = in.readLong();
this.lastUpdate = in.readLong();
-
+ this.xceiverCount = in.readInt();
/**
int numBlocks = in.readInt();
this.blocks = new Block[numBlocks];
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Wed Jun 28 14:05:38 2006
@@ -52,7 +52,8 @@
*/
public BlockCommand sendHeartbeat(DatanodeRegistration registration,
long capacity, long remaining,
- int xmitsInProgress) throws IOException;
+ int xmitsInProgress,
+ int xceiverCount) throws IOException;
/**
* blockReport() tells the NameNode about all the locally-stored blocks.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Wed Jun 28 14:05:38 2006
@@ -22,6 +22,7 @@
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
/****************************************************************
* Implementation of the abstract FileSystem for the DFS system.
@@ -96,6 +97,13 @@
return dfs.create(getPath(f), overwrite, replication, blockSize);
}
+ public FSOutputStream createRaw(Path f, boolean overwrite,
+ short replication, long blockSize,
+ Progressable progress)
+ throws IOException {
+ return dfs.create(getPath(f), overwrite, replication, blockSize, progress);
+ }
+
public boolean setReplicationRaw( Path src,
short replication
) throws IOException {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Jun 28 14:05:38 2006
@@ -1132,7 +1132,8 @@
*/
public synchronized void gotHeartbeat(DatanodeID nodeID,
long capacity,
- long remaining) throws IOException {
+ long remaining,
+ int xceiverCount) throws IOException {
synchronized (heartbeats) {
synchronized (datanodeMap) {
long capacityDiff = 0;
@@ -1143,7 +1144,8 @@
if (nodeinfo == null) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.gotHeartbeat: "
+"brand-new heartbeat from "+nodeID.getName() );
- nodeinfo = new DatanodeInfo(nodeID, capacity, remaining);
+
+ nodeinfo = new DatanodeInfo(nodeID, capacity, remaining, xceiverCount);
datanodeMap.put(nodeinfo.getStorageID(), nodeinfo);
capacityDiff = capacity;
remainingDiff = remaining;
@@ -1151,7 +1153,7 @@
capacityDiff = capacity - nodeinfo.getCapacity();
remainingDiff = remaining - nodeinfo.getRemaining();
heartbeats.remove(nodeinfo);
- nodeinfo.updateHeartbeat(capacity, remaining);
+ nodeinfo.updateHeartbeat(capacity, remaining, xceiverCount);
}
heartbeats.add(nodeinfo);
totalCapacity += capacityDiff;
@@ -1771,6 +1773,7 @@
}
}
+ double avgLoad = 0.0;
//
// Build list of machines we can actually choose from
//
@@ -1779,8 +1782,11 @@
DatanodeInfo node = (DatanodeInfo) it.next();
if (! forbiddenMachines.contains(node.getHost())) {
targetList.add(node);
+ avgLoad += node.getXceiverCount();
}
}
+ if (targetList.size() > 0) { avgLoad = avgLoad/targetList.size(); }
+
Collections.shuffle(targetList);
//
@@ -1795,7 +1801,8 @@
for (Iterator it = targetList.iterator(); it.hasNext(); ) {
DatanodeInfo node = (DatanodeInfo) it.next();
if (clientMachine.equals(node.getHost())) {
- if (node.getRemaining() > blockSize * MIN_BLOCKS_FOR_WRITE) {
+ if ((node.getRemaining() > blockSize * MIN_BLOCKS_FOR_WRITE) &&
+ (node.getXceiverCount() < (2.0 * avgLoad))) {
return node;
}
}
@@ -1807,7 +1814,8 @@
//
for (Iterator it = targetList.iterator(); it.hasNext(); ) {
DatanodeInfo node = (DatanodeInfo) it.next();
- if (node.getRemaining() > blockSize * MIN_BLOCKS_FOR_WRITE) {
+ if ((node.getRemaining() > blockSize * MIN_BLOCKS_FOR_WRITE) &&
+ (node.getXceiverCount() < (2.0 * avgLoad))) {
return node;
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Wed Jun 28 14:05:38 2006
@@ -366,9 +366,10 @@
public BlockCommand sendHeartbeat(DatanodeRegistration nodeReg,
long capacity,
long remaining,
- int xmitsInProgress) throws IOException {
+ int xmitsInProgress,
+ int xceiverCount) throws IOException {
verifyRequest( nodeReg );
- namesystem.gotHeartbeat( nodeReg, capacity, remaining );
+ namesystem.gotHeartbeat( nodeReg, capacity, remaining, xceiverCount );
//
// Only ask datanodes to perform block operations (transfer, delete)
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java Wed Jun 28 14:05:38 2006
@@ -19,6 +19,7 @@
import java.util.zip.Checksum;
import java.util.zip.CRC32;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
/** Utility that wraps a {@link FSOutputStream} in a {@link DataOutputStream},
* buffers output through a {@link BufferedOutputStream} and creates a checksum
@@ -41,7 +42,18 @@
long blockSize,
Configuration conf)
throws IOException {
- super(fs.createRaw(file, overwrite, replication, blockSize));
+ this(fs, file, overwrite, replication, blockSize, conf, null);
+ }
+
+ public Summer(FileSystem fs,
+ Path file,
+ boolean overwrite,
+ short replication,
+ long blockSize,
+ Configuration conf,
+ Progressable progress)
+ throws IOException {
+ super(fs.createRaw(file, overwrite, replication, blockSize, progress));
this.bytesPerSum = conf.getInt("io.bytes.per.checksum", 512);
this.sums = new FSDataOutputStream(
fs.createRaw(FileSystem.getChecksumFile(file), true,
@@ -50,7 +62,7 @@
sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
sums.writeInt(this.bytesPerSum);
}
-
+
public void write(byte b[], int off, int len) throws IOException {
int summed = 0;
while (summed < len) {
@@ -137,6 +149,17 @@
bufferSize));
}
+ public FSDataOutputStream(FileSystem fs, Path file,
+ boolean overwrite, Configuration conf,
+ int bufferSize, short replication, long blockSize,
+ Progressable progress)
+ throws IOException {
+ super(new Buffer(
+ new PositionCache(
+ new Summer(fs, file, overwrite, replication, blockSize, conf, progress)),
+ bufferSize));
+ }
+
/** Construct without checksums. */
private FSDataOutputStream(FSOutputStream out, Configuration conf) throws IOException {
this(out, conf.getInt("io.file.buffer.size", 4096));
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Wed Jun 28 14:05:38 2006
@@ -23,6 +23,7 @@
import org.apache.hadoop.dfs.*;
import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.Progressable;
/****************************************************************
* An abstract base class for a fairly generic filesystem. It
@@ -180,6 +181,18 @@
}
/**
+ * Create an FSDataOutputStream at the indicated Path with write-progress
+ * reporting.
+ * Files are overwritten by default.
+ */
+ public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
+ return create(f, true,
+ getConf().getInt("io.file.buffer.size", 4096),
+ getDefaultReplication(),
+ getDefaultBlockSize(), progress);
+ }
+
+ /**
* Opens an FSDataOutputStream at the indicated Path.
* Files are overwritten by default.
*/
@@ -192,6 +205,20 @@
}
/**
+ * Opens an FSDataOutputStream at the indicated Path with write-progress
+ * reporting.
+ * Files are overwritten by default.
+ */
+ public FSDataOutputStream create(Path f, short replication, Progressable progress)
+ throws IOException {
+ return create(f, true,
+ getConf().getInt("io.file.buffer.size", 4096),
+ replication,
+ getDefaultBlockSize(), progress);
+ }
+
+
+ /**
* Opens an FSDataOutputStream at the indicated Path.
* @param f the file name to open
* @param overwrite if a file with this name already exists, then if true,
@@ -208,6 +235,25 @@
}
/**
+ * Opens an FSDataOutputStream at the indicated Path with write-progress
+ * reporting.
+ * @param f the file name to open
+ * @param overwrite if a file with this name already exists, then if true,
+ * the file will be overwritten, and if false an error will be thrown.
+ * @param bufferSize the size of the buffer to be used.
+ */
+ public FSDataOutputStream create( Path f,
+ boolean overwrite,
+ int bufferSize,
+ Progressable progress
+ ) throws IOException {
+ return create( f, overwrite, bufferSize,
+ getDefaultReplication(),
+ getDefaultBlockSize(), progress);
+ }
+
+
+ /**
* Opens an FSDataOutputStream at the indicated Path.
* @param f the file name to open
* @param overwrite if a file with this name already exists, then if true,
@@ -225,6 +271,26 @@
bufferSize, replication, blockSize );
}
+ /**
+ * Opens an FSDataOutputStream at the indicated Path with write-progress
+ * reporting.
+ * @param f the file name to open
+ * @param overwrite if a file with this name already exists, then if true,
+ * the file will be overwritten, and if false an error will be thrown.
+ * @param bufferSize the size of the buffer to be used.
+ * @param replication required block replication for the file.
+ */
+ public FSDataOutputStream create( Path f,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress
+ ) throws IOException {
+ return new FSDataOutputStream(this, f, overwrite, getConf(),
+ bufferSize, replication, blockSize, progress );
+ }
+
/** Opens an OutputStream at the indicated Path.
* @param f the file name to open
* @param overwrite if a file with this name already exists, then if true,
@@ -236,6 +302,18 @@
long blockSize)
throws IOException;
+ /** Opens an OutputStream at the indicated Path with write-progress
+ * reporting.
+ * @param f the file name to open
+ * @param overwrite if a file with this name already exists, then if true,
+ * the file will be overwritten, and if false an error will be thrown.
+ * @param replication required block replication for the file.
+ */
+ public abstract FSOutputStream createRaw(Path f, boolean overwrite,
+ short replication,
+ long blockSize, Progressable progress)
+ throws IOException;
+
/** @deprecated Call {@link #createNewFile(Path)} instead. */
public boolean createNewFile(File f) throws IOException {
return createNewFile(new Path(f.toString()));
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Wed Jun 28 14:05:38 2006
@@ -19,10 +19,8 @@
import java.io.*;
import java.util.*;
import java.nio.channels.*;
-
-import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
/****************************************************************
* Implement the FileSystem API for the native filesystem.
@@ -174,6 +172,13 @@
return new LocalFSFileOutputStream(f);
}
+ public FSOutputStream createRaw(Path f, boolean overwrite,
+ short replication, long blockSize,
+ Progressable progress)
+ throws IOException {
+ // ignore write-progress reporter for local files
+ return createRaw(f, overwrite, replication, blockSize);
+ }
/**
* Replication is not supported for the local file system.
*/
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Wed Jun 28 14:05:38 2006
@@ -26,6 +26,7 @@
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.Progressable;
/** Support for flat files of binary key/value pairs. */
public class SequenceFile {
@@ -88,6 +89,13 @@
this(fs, name, keyClass, valClass, false);
}
+ /** Create the named file with write-progress reporter. */
+ public Writer(FileSystem fs, Path name, Class keyClass, Class valClass,
+ Progressable progress)
+ throws IOException {
+ this(fs, name, keyClass, valClass, false, progress);
+ }
+
/** Create the named file.
* @param compress if true, values are compressed.
*/
@@ -96,6 +104,17 @@
throws IOException {
this.target = name;
init(fs.create(target), keyClass, valClass, compress);
+ }
+
+ /** Create the named file with write-progress reporter.
+ * @param compress if true, values are compressed.
+ */
+ public Writer(FileSystem fs, Path name,
+ Class keyClass, Class valClass, boolean compress,
+ Progressable progress)
+ throws IOException {
+ this.target = name;
+ init(fs.create(target, progress), keyClass, valClass, compress);
}
/** Write to an arbitrary stream using a specified buffer size. */
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Wed Jun 28 14:05:38 2006
@@ -27,15 +27,18 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
/** An {@link OutputFormat} that writes {@link MapFile}s. */
public class MapFileOutputFormat extends OutputFormatBase {
public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
- String name) throws IOException {
+ String name, Progressable progress)
+ throws IOException {
Path file = new Path(job.getOutputPath(), name);
+ // ignore the progress parameter, since MapFile is local
final MapFile.Writer out =
new MapFile.Writer(fs, file.toString(),
job.getMapOutputKeyClass(),
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Wed Jun 28 14:05:38 2006
@@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.util.Progressable;
/** The location of a map output file, as passed to a reduce task via the
* {@link InterTrackerProtocol}. */
@@ -89,14 +90,6 @@
return "http://" + host + ":" + port + "/getMapOutput.jsp?map=" +
mapTaskId;
}
-
- /**
- * An interface for callbacks when an method makes some progress.
- * @author Owen O'Malley
- */
- public static interface Pingable {
- void ping();
- }
/**
* Get the map output into a local file from the remote server.
@@ -110,7 +103,7 @@
public long getFile(FileSystem fileSys,
Path localFilename,
int reduce,
- Pingable pingee) throws IOException {
+ Progressable pingee) throws IOException {
URL path = new URL(toString() + "&reduce=" + reduce);
InputStream input = path.openConnection().getInputStream();
OutputStream output = fileSys.create(localFilename);
@@ -122,7 +115,7 @@
totalBytes += len;
output.write(buffer, 0 ,len);
if (pingee != null) {
- pingee.ping();
+ pingee.progress();
}
len = input.read(buffer);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java Wed Jun 28 14:05:38 2006
@@ -19,19 +19,22 @@
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.Progressable;
/** An output data format. Output files are stored in a {@link
* FileSystem}. */
public interface OutputFormat {
- /** Construct a {@link RecordWriter}.
+ /** Construct a {@link RecordWriter} with Progressable.
*
* @param fs the file system to write to
* @param job the job whose output is being written
* @param name the unique name for this part of the output
+ * @param progress mechanism for reporting progress while writing to file
* @return a {@link RecordWriter}
*/
- RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name)
+ RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name,
+ Progressable progress)
throws IOException;
/** Check whether the output specification for a job is appropriate. Called
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java Wed Jun 28 14:05:38 2006
@@ -20,11 +20,13 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
/** A base class for {@link OutputFormat}. */
public abstract class OutputFormatBase implements OutputFormat {
public abstract RecordWriter getRecordWriter(FileSystem fs,
- JobConf job, String name)
+ JobConf job, String name,
+ Progressable progress)
throws IOException;
public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Jun 28 14:05:38 2006
@@ -223,10 +223,12 @@
sortPhase.complete(); // sort is complete
+ Reporter reporter = getReporter(umbilical, getProgress());
+
// make output collector
String name = getOutputName(getPartition());
final RecordWriter out =
- job.getOutputFormat().getRecordWriter(FileSystem.get(job), job, name);
+ job.getOutputFormat().getRecordWriter(FileSystem.get(job), job, name, reporter);
OutputCollector collector = new OutputCollector() {
public void collect(WritableComparable key, Writable value)
throws IOException {
@@ -237,7 +239,6 @@
// apply reduce function
SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile, job);
- Reporter reporter = getReporter(umbilical, getProgress());
long length = lfs.getLength(sortedFile);
try {
ValuesIterator values = new ValuesIterator(in, length, comparator,
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Wed Jun 28 14:05:38 2006
@@ -22,6 +22,7 @@
import java.io.*;
import java.util.*;
import java.text.DecimalFormat;
+import org.apache.hadoop.util.Progressable;
/** Runs a reduce task. */
class ReduceTaskRunner extends TaskRunner {
@@ -115,7 +116,7 @@
public MapOutputLocation getLocation() { return loc; }
}
- private static class PingTimer implements MapOutputLocation.Pingable {
+ private static class PingTimer implements Progressable {
private long pingTime;
public synchronized void reset() {
@@ -126,7 +127,7 @@
return pingTime;
}
- public void ping() {
+ public void progress() {
synchronized (this) {
pingTime = System.currentTimeMillis();
}
@@ -202,7 +203,7 @@
try {
start(loc);
- pingTimer.ping();
+ pingTimer.progress();
size = copyOutput(loc, pingTimer);
pingTimer.reset();
} catch (IOException e) {
@@ -222,7 +223,7 @@
* @throws IOException if there is an error copying the file
*/
private long copyOutput(MapOutputLocation loc,
- MapOutputLocation.Pingable pingee)
+ Progressable pingee)
throws IOException {
String reduceId = reduceTask.getTaskId();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Wed Jun 28 14:05:38 2006
@@ -17,9 +17,10 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import org.apache.hadoop.util.Progressable;
/** Passed to application code to permit alteration of status. */
-public interface Reporter {
+public interface Reporter extends Progressable {
/** Alter the application's status description.
*
* @param status a brief description of the current status
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Wed Jun 28 14:05:38 2006
@@ -27,12 +27,14 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
/** An {@link OutputFormat} that writes {@link SequenceFile}s. */
public class SequenceFileOutputFormat extends OutputFormatBase {
public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
- String name) throws IOException {
+ String name, Progressable progress)
+ throws IOException {
Path file = new Path(job.getOutputPath(), name);
@@ -40,7 +42,8 @@
new SequenceFile.Writer(fs, file,
job.getOutputKeyClass(),
job.getOutputValueClass(),
- job.getBoolean("mapred.output.compress", false));
+ job.getBoolean("mapred.output.compress", false),
+ progress);
return new RecordWriter() {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Jun 28 14:05:38 2006
@@ -96,7 +96,10 @@
return new Reporter() {
public void setStatus(String status) throws IOException {
progress.setStatus(status);
- reportProgress(umbilical);
+ progress();
+ }
+ public void progress() throws IOException {
+ reportProgress(umbilical);
}
};
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java Wed Jun 28 14:05:38 2006
@@ -24,16 +24,17 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
/** An {@link OutputFormat} that writes plain text files. */
public class TextOutputFormat extends OutputFormatBase {
public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
- String name) throws IOException {
+ String name, Progressable progress) throws IOException {
Path file = new Path(job.getOutputPath(), name);
- final FSDataOutputStream out = fs.create(file);
+ final FSDataOutputStream out = fs.create(file, progress);
return new RecordWriter() {
public synchronized void write(WritableComparable key, Writable value)
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java?rev=417874&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java Wed Jun 28 14:05:38 2006
@@ -0,0 +1,16 @@
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+
+
+
+/**
+ * An interface for callbacks when an method makes some progress.
+ * @author Owen O'Malley
+ */
+public interface Progressable {
+ /** callback for reporting progress. Used by DFSclient to report
+ * progress while writing a block of DFS file.
+ */
+ public void progress() throws IOException;
+}
\ No newline at end of file
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java Wed Jun 28 14:05:38 2006
@@ -381,6 +381,7 @@
for( int i=0; i < nrFiles; i++)
ioer.doIO(new Reporter() {
public void setStatus(String status) throws IOException {}
+ public void progress() throws IOException {}
},
BASE_FILE_NAME+Integer.toString(i),
MEGA*fileSize );
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java Wed Jun 28 14:05:38 2006
@@ -288,6 +288,7 @@
for( int i=0; i < nrFiles; i++)
ioer.doIO(new Reporter() {
public void setStatus(String status) throws IOException {}
+ public void progress() throws IOException {}
},
BASE_FILE_NAME+Integer.toString(i),
MEGA*fileSize );
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java Wed Jun 28 14:05:38 2006
@@ -40,6 +40,7 @@
Reporter reporter = new Reporter() {
public void setStatus(String status) throws IOException {}
+ public void progress() throws IOException {}
};
int seed = new Random().nextInt();
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Wed Jun 28 14:05:38 2006
@@ -40,6 +40,7 @@
Reporter reporter = new Reporter() {
public void setStatus(String status) throws IOException {}
+ public void progress() throws IOException {}
};
int seed = new Random().nextInt();
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java Wed Jun 28 14:05:38 2006
@@ -46,6 +46,7 @@
Reporter reporter = new Reporter() {
public void setStatus(String status) throws IOException {}
+ public void progress() throws IOException {}
};
int seed = new Random().nextInt();