You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/06/28 23:05:40 UTC

svn commit: r417874 - in /lucene/hadoop/trunk: ./ src/examples/org/apache/hadoop/examples/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/...

Author: cutting
Date: Wed Jun 28 14:05:38 2006
New Revision: 417874

URL: http://svn.apache.org/viewvc?rev=417874&view=rev
Log:
HADOOP-318.  Keep slow DFS output from causing task timeouts.  Contributed by Milind.

Added:
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
    lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 28 14:05:38 2006
@@ -80,6 +80,12 @@
 19. HADOOP-27.  Don't allocate tasks to trackers whose local free
     space is too low.  (Johan Oskarson via cutting)
 
+20. HADOOP-318.  Keep slow DFS output from causing task timeouts.
+    This incompatibly changes some public interfaces, adding a
+    parameter to OutputFormat.getRecordWriter() and the new method
+    Reporter.progress(), but it makes lots of tasks succeed that were
+    previously failing.  (Milind Bhandarkar via cutting)
+
 
 Release 0.3.2 - 2006-06-09
 

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Wed Jun 28 14:05:38 2006
@@ -36,6 +36,7 @@
 	    pgd.addClass("randomwriter", RandomWriter.class, 
                         "A random writer benchmark that writes 10GB per node.");
             pgd.addClass("sort", Sort.class, "A sort benchmark that sorts the data written by the random writer.");
+            pgd.addClass("pi", PiBenchmark.class, "A benchmark that estimates Pi using monte-carlo method.");
             pgd.driver(argv);
 	}
 	catch(Throwable e){

Added: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java?rev=417874&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java (added)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiBenchmark.java Wed Jun 28 14:05:38 2006
@@ -0,0 +1,225 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.*;
+
+/**
+ * A Map-reduce program to estimaate the valu eof Pi using monte-carlo
+ * method.
+ *
+ * @author Milind Bhandarkar
+ */
+public class PiBenchmark {
+  
+  /**
+   * Mappper class for Pi estimation.
+   */
+  
+  public static class PiMapper extends MapReduceBase implements Mapper {
+    
+    /** Mapper configuration.
+     *
+     */
+    public void configure(JobConf job) {
+    }
+    
+    static Random r = new Random();
+    
+    long numInside = 0L;
+    long numOutside = 0L;
+    
+    /** Map method.
+     * @param key
+     * @param value not-used.
+     * @param out
+     * @param reporter
+     */
+    public void map(WritableComparable key,
+            Writable val,
+            OutputCollector out,
+            Reporter reporter) throws IOException {
+        long nSamples = ((LongWritable) key).get();
+        for(long idx = 0; idx < nSamples; idx++) {
+            double x = r.nextDouble();
+            double y = r.nextDouble();
+            double d = (x-0.5)*(x-0.5)+(y-0.5)*(y-0.5);
+            if (d > 0.25) {
+                numOutside++;
+            } else {
+                numInside++;
+            }
+            if (idx%1000 == 1) {
+                reporter.setStatus("Generated "+idx+" samples.");
+            }
+        }
+        out.collect(new LongWritable(0), new LongWritable(numOutside));
+        out.collect(new LongWritable(1), new LongWritable(numInside));
+    }
+    
+    public void close() {
+      // nothing
+    }
+  }
+  
+  public static class PiReducer extends MapReduceBase implements Reducer {
+      long numInside = 0;
+      long numOutside = 0;
+      JobConf conf;
+      
+      /** Reducer configuration.
+       *
+       */
+      public void configure(JobConf job) {
+          conf = job;
+      }
+      /** Reduce method.
+       * @ param key
+       * @param values
+       * @param output
+       * @param reporter
+       */
+      public void reduce(WritableComparable key,
+              Iterator values,
+              OutputCollector output,
+              Reporter reporter) throws IOException {
+          if (((LongWritable)key).get() == 1) {
+              while (values.hasNext()) {
+                  long num = ((LongWritable)values.next()).get();
+                  numInside += num;
+              }
+          } else {
+              while (values.hasNext()) {
+                  long num = ((LongWritable)values.next()).get();
+                  numOutside += num;
+              }
+          }
+      }
+      
+      public void close() throws IOException {
+        Path tmpDir = new Path("test-mini-mr");
+        Path outDir = new Path(tmpDir, "out");
+        Path outFile = new Path(outDir, "reduce-out");
+        FileSystem fileSys = FileSystem.get(conf);
+        SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, outFile,
+              LongWritable.class, LongWritable.class);
+        writer.append(new LongWritable(numInside), new LongWritable(numOutside));
+        writer.close();
+      }
+  }
+
+  /**
+   * This is the main driver for computing the value of Pi using
+   * monte-carlo method.
+   */
+  static double launch(int numMaps, long numPoints, String jt, String dfs)
+  throws IOException {
+
+    Configuration conf = new Configuration();
+    JobConf jobConf = new JobConf(conf, PiBenchmark.class);
+    if (jt != null) { jobConf.set("mapred.job.tracker", jt); }
+    if (dfs != null) { jobConf.set("fs.default.name", dfs); }
+    jobConf.setJobName("test-mini-mr");
+    
+    // turn off speculative execution, because DFS doesn't handle
+    // multiple writers to the same file.
+    jobConf.setSpeculativeExecution(false);
+    jobConf.setInputKeyClass(LongWritable.class);
+    jobConf.setInputValueClass(LongWritable.class);
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+        
+    jobConf.setOutputKeyClass(LongWritable.class);
+    jobConf.setOutputValueClass(LongWritable.class);
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    
+    jobConf.setMapperClass(PiMapper.class);
+    jobConf.setReducerClass(PiReducer.class);
+    
+    jobConf.setNumReduceTasks(1);
+
+    Path tmpDir = new Path("test-mini-mr");
+    Path inDir = new Path(tmpDir, "in");
+    Path outDir = new Path(tmpDir, "out");
+    FileSystem fileSys = FileSystem.get(jobConf);
+    fileSys.delete(tmpDir);
+    fileSys.mkdirs(inDir);
+    
+    jobConf.setInputPath(inDir);
+    jobConf.setOutputPath(outDir);
+    
+    jobConf.setNumMapTasks(numMaps);
+    
+    for(int idx=0; idx < numMaps; ++idx) {
+      Path file = new Path(inDir, "part"+idx);
+      SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file,
+              LongWritable.class, LongWritable.class);
+      writer.append(new LongWritable(numPoints), new LongWritable(0));
+      writer.close();
+      System.out.println("Wrote input for Map #"+idx);
+    }
+    
+    double estimate = 0.0;
+    
+    try {
+      System.out.println("Starting Job");
+      long startTime = System.currentTimeMillis();
+      JobClient.runJob(jobConf);
+      System.out.println("Job Finished in "+
+              (double)(System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+      Path inFile = new Path(outDir, "reduce-out");
+      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile,
+              jobConf);
+      LongWritable numInside = new LongWritable();
+      LongWritable numOutside = new LongWritable();
+      reader.next(numInside, numOutside);
+      reader.close();
+      estimate = (double) (numInside.get()*4.0)/(numMaps*numPoints);
+    } finally {
+      fileSys.delete(tmpDir);
+    }
+    
+    return estimate;
+  }
+  
+  /**
+     * Launches all the tasks in order.
+     */
+    public static void main(String[] argv) throws Exception {
+        if (argv.length < 2) {
+            System.err.println("Usage: TestMiniMR <nMaps> <nSamples>");
+            return;
+        }
+
+        int nMaps = Integer.parseInt(argv[0]);
+        long nSamples = Long.parseLong(argv[1]);
+        
+        System.out.println("Number of Maps = "+nMaps+" Samples per Map = "+nSamples);
+        
+	System.out.println("Estimated value of PI is "+
+                launch(nMaps, nSamples, null, null));
+    }
+}

Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Wed Jun 28 14:05:38 2006
@@ -76,7 +76,8 @@
       String filename = ((UTF8) value).toString();
       SequenceFile.Writer writer = 
         new SequenceFile.Writer(fileSys, new Path(filename), 
-                                BytesWritable.class, BytesWritable.class);
+                                BytesWritable.class, BytesWritable.class,
+                                reporter);
       int itemCount = 0;
       while (numBytesToWrite > 0) {
         int keyLength = minKeySize + 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Jun 28 14:05:38 2006
@@ -216,7 +216,23 @@
     public FSOutputStream create( UTF8 src, 
                                   boolean overwrite
                                 ) throws IOException {
-      return create( src, overwrite, defaultReplication, defaultBlockSize);
+      return create( src, overwrite, defaultReplication, defaultBlockSize, null);
+    }
+    
+    /**
+     * Create a new dfs file and return an output stream for writing into it
+     * with write-progress reporting. 
+     * 
+     * @param src stream name
+     * @param overwrite do not check for file existence if true
+     * @return output stream
+     * @throws IOException
+     */
+    public FSOutputStream create( UTF8 src, 
+                                  boolean overwrite,
+                                  Progressable progress
+                                ) throws IOException {
+      return create( src, overwrite, defaultReplication, defaultBlockSize, null);
     }
     
     /**
@@ -234,15 +250,34 @@
                                   short replication,
                                   long blockSize
                                 ) throws IOException {
+      return create(src, overwrite, replication, blockSize, null);
+    }
+
+    /**
+     * Create a new dfs file with the specified block replication 
+     * with write-progress reporting and return an output stream for writing
+     * into the file.  
+     * 
+     * @param src stream name
+     * @param overwrite do not check for file existence if true
+     * @param replication block replication
+     * @return output stream
+     * @throws IOException
+     */
+    public FSOutputStream create( UTF8 src, 
+                                  boolean overwrite, 
+                                  short replication,
+                                  long blockSize,
+                                  Progressable progress
+                                ) throws IOException {
       checkOpen();
       FSOutputStream result = new DFSOutputStream(src, overwrite, 
-                                                  replication, blockSize);
+                                                  replication, blockSize, progress);
       synchronized (pendingCreates) {
         pendingCreates.put(src.toString(), result);
       }
       return result;
     }
-
     /**
      * Set replication for an existing file.
      * 
@@ -718,11 +753,13 @@
         private String datanodeName;
         private long blockSize;
 
+        private Progressable progress;
         /**
          * Create a new output stream to the given DataNode.
          */
         public DFSOutputStream(UTF8 src, boolean overwrite, 
-                               short replication, long blockSize
+                               short replication, long blockSize,
+                               Progressable progress
                                ) throws IOException {
             this.src = src;
             this.overwrite = overwrite;
@@ -730,6 +767,10 @@
             this.backupFile = newBackupFile();
             this.blockSize = blockSize;
             this.backupStream = new FileOutputStream(backupFile);
+            this.progress = progress;
+            if (progress != null) {
+                LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
+            }
         }
 
         private File newBackupFile() throws IOException {
@@ -980,6 +1021,7 @@
                     while (bytesRead > 0) {
                         blockStream.writeLong((long) bytesRead);
                         blockStream.write(buf, 0, bytesRead);
+                        if (progress != null) { progress.progress(); }
                         bytesRead = in.read(buf);
                     }
                     internalClose();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Jun 28 14:05:38 2006
@@ -212,6 +212,17 @@
         shutdown();
     }
     
+    private static class Count {
+        int value = 0;
+        Count(int init) { value = init; }
+        synchronized void incr() { value++; }
+        synchronized void decr() { value--; }
+        public String toString() { return Integer.toString(value); }
+        public int getValue() { return value; }
+    }
+    
+    Count xceiverCount = new Count(0);
+    
     /**
      * Main loop for the DataNode.  Runs until shutdown,
      * forever calling remote NameNode functions.
@@ -243,7 +254,8 @@
             BlockCommand cmd = namenode.sendHeartbeat(dnRegistration, 
                                                       data.getCapacity(), 
                                                       data.getRemaining(), 
-                                                      xmitsInProgress);
+                                                      xmitsInProgress,
+                                                      xceiverCount.getValue());
             //LOG.info("Just sent heartbeat, with name " + localName);
             lastHeartbeat = now;
 
@@ -345,6 +357,8 @@
       }
     } // offerService
 
+    
+    
     /**
      * Server used for receiving/sending a block of data.
      * This is created to listen for requests from clients or 
@@ -366,6 +380,7 @@
                     Socket s = ss.accept();
                     //s.setSoTimeout(READ_TIMEOUT);
                     data.checkDataDir();
+                    xceiverCount.incr();
                     new Daemon(new DataXceiver(s)).start();
                 }
                 ss.close();
@@ -393,6 +408,7 @@
         Socket s;
         public DataXceiver(Socket s) {
             this.s = s;
+            LOG.debug("Number of active connections is: "+xceiverCount);
         }
 
         /**
@@ -421,6 +437,8 @@
               LOG.warn("DataXCeiver", ie);
             } finally {
                 try {
+                    xceiverCount.decr();
+                    LOG.debug("Number of active connections is: "+xceiverCount);
                     s.close();
                 } catch (IOException ie2) {
                 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java Wed Jun 28 14:05:38 2006
@@ -28,6 +28,7 @@
  * @author Mike Cafarella
  **************************************************/
 public class DatanodeInfo extends DatanodeID implements Writable, Comparable {
+  private int xceiverCount;
 
     static {                                      // register a ctor
       WritableFactories.setFactory
@@ -36,18 +37,18 @@
            public Writable newInstance() { return new DatanodeInfo(); }
          });
     }
-
+  /** number of active connections */
+  public int getXceiverCount() { return xceiverCount; }
+  
     private long capacityBytes, remainingBytes, lastUpdate;
     private volatile TreeSet blocks;
-
     /** Create an empty DatanodeInfo.
      */
     public DatanodeInfo() {
-        this(new String(), new String(), 0, 0);
+        this(new String(), new String(), 0, 0, 0);
     }
-
     public DatanodeInfo( DatanodeID nodeID ) {
-      this( nodeID.getName(), nodeID.getStorageID(), 0, 0);
+      this( nodeID.getName(), nodeID.getStorageID(), 0, 0, 0);
     }
     
    /**
@@ -55,22 +56,22 @@
     */
     public DatanodeInfo(DatanodeID nodeID, 
                         long capacity, 
-                        long remaining) {
-      this( nodeID.getName(), nodeID.getStorageID(), capacity, remaining );
+                        long remaining,
+                        int xceiverCount) {
+      this( nodeID.getName(), nodeID.getStorageID(), capacity, remaining, xceiverCount );
     }
-
    /**
     * @param name hostname:portNumber as String object.
     */
     public DatanodeInfo(String name, 
                         String storageID, 
                         long capacity, 
-                        long remaining) {
+                        long remaining,
+                        int xceiverCount) {
         super( name, storageID );
         this.blocks = new TreeSet();
-        updateHeartbeat(capacity, remaining);
+        updateHeartbeat(capacity, remaining, xceiverCount);
     }
-
    /**
     */
     public void updateBlocks(Block newBlocks[]) {
@@ -88,9 +89,10 @@
 
     /**
      */
-    public void updateHeartbeat(long capacity, long remaining) {
+    public void updateHeartbeat(long capacity, long remaining, int xceiverCount) {
         this.capacityBytes = capacity;
         this.remainingBytes = remaining;
+        this.xceiverCount = xceiverCount;
         this.lastUpdate = System.currentTimeMillis();
     }
 
@@ -119,7 +121,6 @@
         DatanodeInfo d = (DatanodeInfo) o;
         return name.compareTo(d.getName());
     }
-
     /////////////////////////////////////////////////
     // Writable
     /////////////////////////////////////////////////
@@ -131,6 +132,7 @@
         out.writeLong(capacityBytes);
         out.writeLong(remainingBytes);
         out.writeLong(lastUpdate);
+        out.writeInt(xceiverCount);
 
         /**
         out.writeInt(blocks.length);
@@ -151,7 +153,7 @@
         this.capacityBytes = in.readLong();
         this.remainingBytes = in.readLong();
         this.lastUpdate = in.readLong();
-
+        this.xceiverCount = in.readInt();
         /**
         int numBlocks = in.readInt();
         this.blocks = new Block[numBlocks];

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Wed Jun 28 14:05:38 2006
@@ -52,7 +52,8 @@
      */
     public BlockCommand sendHeartbeat(DatanodeRegistration registration,
                                       long capacity, long remaining,
-                                      int xmitsInProgress) throws IOException;
+                                      int xmitsInProgress,
+                                      int xceiverCount) throws IOException;
 
     /**
      * blockReport() tells the NameNode about all the locally-stored blocks.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Wed Jun 28 14:05:38 2006
@@ -22,6 +22,7 @@
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
 
 /****************************************************************
  * Implementation of the abstract FileSystem for the DFS system.
@@ -96,6 +97,13 @@
       return dfs.create(getPath(f), overwrite, replication, blockSize);
     }
 
+    public FSOutputStream createRaw(Path f, boolean overwrite, 
+                                    short replication, long blockSize,
+                                    Progressable progress)
+      throws IOException {
+      return dfs.create(getPath(f), overwrite, replication, blockSize, progress);
+    }
+    
     public boolean setReplicationRaw( Path src, 
                                       short replication
                                     ) throws IOException {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Jun 28 14:05:38 2006
@@ -1132,7 +1132,8 @@
      */
     public synchronized void gotHeartbeat(DatanodeID nodeID,
                                           long capacity, 
-                                          long remaining) throws IOException {
+                                          long remaining,
+                                          int xceiverCount) throws IOException {
       synchronized (heartbeats) {
         synchronized (datanodeMap) {
           long capacityDiff = 0;
@@ -1143,7 +1144,8 @@
           if (nodeinfo == null) {
             NameNode.stateChangeLog.debug("BLOCK* NameSystem.gotHeartbeat: "
                     +"brand-new heartbeat from "+nodeID.getName() );
-            nodeinfo = new DatanodeInfo(nodeID, capacity, remaining);
+
+            nodeinfo = new DatanodeInfo(nodeID, capacity, remaining, xceiverCount);
             datanodeMap.put(nodeinfo.getStorageID(), nodeinfo);
             capacityDiff = capacity;
             remainingDiff = remaining;
@@ -1151,7 +1153,7 @@
             capacityDiff = capacity - nodeinfo.getCapacity();
             remainingDiff = remaining - nodeinfo.getRemaining();
             heartbeats.remove(nodeinfo);
-            nodeinfo.updateHeartbeat(capacity, remaining);
+            nodeinfo.updateHeartbeat(capacity, remaining, xceiverCount);
           }
           heartbeats.add(nodeinfo);
           totalCapacity += capacityDiff;
@@ -1771,6 +1773,7 @@
             }
         }
 
+        double avgLoad = 0.0;
         //
         // Build list of machines we can actually choose from
         //
@@ -1779,8 +1782,11 @@
             DatanodeInfo node = (DatanodeInfo) it.next();
             if (! forbiddenMachines.contains(node.getHost())) {
                 targetList.add(node);
+                avgLoad += node.getXceiverCount();
             }
         }
+        if (targetList.size() > 0) { avgLoad = avgLoad/targetList.size(); }
+        
         Collections.shuffle(targetList);
         
         //
@@ -1795,7 +1801,8 @@
                 for (Iterator it = targetList.iterator(); it.hasNext(); ) {
                     DatanodeInfo node = (DatanodeInfo) it.next();
                     if (clientMachine.equals(node.getHost())) {
-                        if (node.getRemaining() > blockSize * MIN_BLOCKS_FOR_WRITE) {
+                        if ((node.getRemaining() > blockSize * MIN_BLOCKS_FOR_WRITE) &&
+                            (node.getXceiverCount() < (2.0 * avgLoad))) {
                             return node;
                         }
                     }
@@ -1807,7 +1814,8 @@
             //
             for (Iterator it = targetList.iterator(); it.hasNext(); ) {
                 DatanodeInfo node = (DatanodeInfo) it.next();
-                if (node.getRemaining() > blockSize * MIN_BLOCKS_FOR_WRITE) {
+                if ((node.getRemaining() > blockSize * MIN_BLOCKS_FOR_WRITE) &&
+                    (node.getXceiverCount() < (2.0 * avgLoad))) {
                     return node;
                 }
             }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Wed Jun 28 14:05:38 2006
@@ -366,9 +366,10 @@
     public BlockCommand sendHeartbeat(DatanodeRegistration nodeReg,
                                       long capacity, 
                                       long remaining,
-                                      int xmitsInProgress) throws IOException {
+                                      int xmitsInProgress,
+                                      int xceiverCount) throws IOException {
         verifyRequest( nodeReg );
-        namesystem.gotHeartbeat( nodeReg, capacity, remaining );
+        namesystem.gotHeartbeat( nodeReg, capacity, remaining, xceiverCount );
         
         //
         // Only ask datanodes to perform block operations (transfer, delete) 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java Wed Jun 28 14:05:38 2006
@@ -19,6 +19,7 @@
 import java.util.zip.Checksum;
 import java.util.zip.CRC32;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
 
 /** Utility that wraps a {@link FSOutputStream} in a {@link DataOutputStream},
  * buffers output through a {@link BufferedOutputStream} and creates a checksum
@@ -41,7 +42,18 @@
                   long blockSize,
                   Configuration conf)
       throws IOException {
-      super(fs.createRaw(file, overwrite, replication, blockSize));
+      this(fs, file, overwrite, replication, blockSize, conf, null);
+    }
+
+    public Summer(FileSystem fs, 
+                  Path file, 
+                  boolean overwrite, 
+                  short replication,
+                  long blockSize,
+                  Configuration conf,
+                  Progressable progress)
+      throws IOException {
+      super(fs.createRaw(file, overwrite, replication, blockSize, progress));
       this.bytesPerSum = conf.getInt("io.bytes.per.checksum", 512);
       this.sums = new FSDataOutputStream(
             fs.createRaw(FileSystem.getChecksumFile(file), true, 
@@ -50,7 +62,7 @@
       sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
       sums.writeInt(this.bytesPerSum);
     }
-
+    
     public void write(byte b[], int off, int len) throws IOException {
       int summed = 0;
       while (summed < len) {
@@ -137,6 +149,17 @@
             bufferSize));
   }
 
+  public FSDataOutputStream(FileSystem fs, Path file,
+                            boolean overwrite, Configuration conf,
+                            int bufferSize, short replication, long blockSize,
+                            Progressable progress)
+  throws IOException {
+    super(new Buffer(
+            new PositionCache(
+                new Summer(fs, file, overwrite, replication, blockSize, conf, progress)), 
+            bufferSize));
+  }
+  
   /** Construct without checksums. */
   private FSDataOutputStream(FSOutputStream out, Configuration conf) throws IOException {
     this(out, conf.getInt("io.file.buffer.size", 4096));

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Wed Jun 28 14:05:38 2006
@@ -23,6 +23,7 @@
 
 import org.apache.hadoop.dfs.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.Progressable;
 
 /****************************************************************
  * An abstract base class for a fairly generic filesystem.  It
@@ -180,6 +181,18 @@
     }
 
     /**
+     * Create an FSDataOutputStream at the indicated Path with write-progress
+     * reporting.
+     * Files are overwritten by default.
+     */
+    public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
+      return create(f, true, 
+                    getConf().getInt("io.file.buffer.size", 4096),
+                    getDefaultReplication(),
+                    getDefaultBlockSize(), progress);
+    }
+
+    /**
      * Opens an FSDataOutputStream at the indicated Path.
      * Files are overwritten by default.
      */
@@ -192,6 +205,20 @@
     }
 
     /**
+     * Opens an FSDataOutputStream at the indicated Path with write-progress
+     * reporting.
+     * Files are overwritten by default.
+     */
+    public FSDataOutputStream create(Path f, short replication, Progressable progress)
+      throws IOException {
+      return create(f, true, 
+                    getConf().getInt("io.file.buffer.size", 4096),
+                    replication,
+                    getDefaultBlockSize(), progress);
+    }
+
+    
+    /**
      * Opens an FSDataOutputStream at the indicated Path.
      * @param f the file name to open
      * @param overwrite if a file with this name already exists, then if true,
@@ -208,6 +235,25 @@
     }
     
     /**
+     * Opens an FSDataOutputStream at the indicated Path with write-progress
+     * reporting.
+     * @param f the file name to open
+     * @param overwrite if a file with this name already exists, then if true,
+     *   the file will be overwritten, and if false an error will be thrown.
+     * @param bufferSize the size of the buffer to be used.
+     */
+    public FSDataOutputStream create( Path f, 
+                                      boolean overwrite,
+                                      int bufferSize,
+                                      Progressable progress
+                                    ) throws IOException {
+      return create( f, overwrite, bufferSize, 
+                     getDefaultReplication(),
+                     getDefaultBlockSize(), progress);
+    }
+    
+    
+    /**
      * Opens an FSDataOutputStream at the indicated Path.
      * @param f the file name to open
      * @param overwrite if a file with this name already exists, then if true,
@@ -225,6 +271,26 @@
                                     bufferSize, replication, blockSize );
     }
 
+    /**
+     * Opens an FSDataOutputStream at the indicated Path with write-progress
+     * reporting.
+     * @param f the file name to open
+     * @param overwrite if a file with this name already exists, then if true,
+     *   the file will be overwritten, and if false an error will be thrown.
+     * @param bufferSize the size of the buffer to be used.
+     * @param replication required block replication for the file. 
+     */
+    public FSDataOutputStream create( Path f, 
+                                      boolean overwrite,
+                                      int bufferSize,
+                                      short replication,
+                                      long blockSize,
+                                      Progressable progress
+                                    ) throws IOException {
+      return new FSDataOutputStream(this, f, overwrite, getConf(), 
+                                    bufferSize, replication, blockSize, progress );
+    }
+
     /** Opens an OutputStream at the indicated Path.
      * @param f the file name to open
      * @param overwrite if a file with this name already exists, then if true,
@@ -236,6 +302,18 @@
                                              long blockSize)
       throws IOException;
 
+    /** Opens an OutputStream at the indicated Path with write-progress
+     * reporting.
+     * @param f the file name to open
+     * @param overwrite if a file with this name already exists, then if true,
+     *   the file will be overwritten, and if false an error will be thrown.
+     * @param replication required block replication for the file. 
+     */
+    public abstract FSOutputStream createRaw(Path f, boolean overwrite, 
+                                             short replication,
+                                             long blockSize, Progressable progress)
+      throws IOException;
+    
     /** @deprecated Call {@link #createNewFile(Path)} instead. */
     public boolean createNewFile(File f) throws IOException {
       return createNewFile(new Path(f.toString()));

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Wed Jun 28 14:05:38 2006
@@ -19,10 +19,8 @@
 import java.io.*;
 import java.util.*;
 import java.nio.channels.*;
-
-import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
 
 /****************************************************************
  * Implement the FileSystem API for the native filesystem.
@@ -174,6 +172,13 @@
         return new LocalFSFileOutputStream(f);
     }
 
+    public FSOutputStream createRaw(Path f, boolean overwrite, 
+                                    short replication, long blockSize,
+                                    Progressable progress)
+      throws IOException {
+        // ignore write-progress reporter for local files
+        return createRaw(f, overwrite, replication, blockSize);
+    }
     /**
      * Replication is not supported for the local file system.
      */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Wed Jun 28 14:05:38 2006
@@ -26,6 +26,7 @@
 import org.apache.commons.logging.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.Progressable;
 
 /** Support for flat files of binary key/value pairs. */
 public class SequenceFile {
@@ -88,6 +89,13 @@
       this(fs, name, keyClass, valClass, false);
     }
     
+    /** Create the named file with write-progress reporter. */
+    public Writer(FileSystem fs, Path name, Class keyClass, Class valClass,
+            Progressable progress)
+      throws IOException {
+      this(fs, name, keyClass, valClass, false, progress);
+    }
+    
     /** Create the named file.
      * @param compress if true, values are compressed.
      */
@@ -96,6 +104,17 @@
       throws IOException {
       this.target = name;
       init(fs.create(target), keyClass, valClass, compress);
+    }
+    
+    /** Create the named file with write-progress reporter.
+     * @param compress if true, values are compressed.
+     */
+    public Writer(FileSystem fs, Path name,
+                  Class keyClass, Class valClass, boolean compress,
+                  Progressable progress)
+      throws IOException {
+      this.target = name;
+      init(fs.create(target, progress), keyClass, valClass, compress);
     }
     
     /** Write to an arbitrary stream using a specified buffer size. */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Wed Jun 28 14:05:38 2006
@@ -27,15 +27,18 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
 
 /** An {@link OutputFormat} that writes {@link MapFile}s. */
 public class MapFileOutputFormat extends OutputFormatBase {
 
   public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
-                                      String name) throws IOException {
+                                      String name, Progressable progress)
+                                      throws IOException {
 
     Path file = new Path(job.getOutputPath(), name);
 
+    // ignore the progress parameter, since MapFile is local
     final MapFile.Writer out =
       new MapFile.Writer(fs, file.toString(),
                          job.getMapOutputKeyClass(),

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Wed Jun 28 14:05:38 2006
@@ -23,6 +23,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.util.Progressable;
 
 /** The location of a map output file, as passed to a reduce task via the
  * {@link InterTrackerProtocol}. */ 
@@ -89,14 +90,6 @@
     return "http://" + host + ":" + port + "/getMapOutput.jsp?map=" + 
             mapTaskId;
   }
-
-  /**
-   * An interface for callbacks when an method makes some progress.
-   * @author Owen O'Malley
-   */
-  public static interface Pingable {
-    void ping();
-  }
   
   /**
    * Get the map output into a local file from the remote server.
@@ -110,7 +103,7 @@
   public long getFile(FileSystem fileSys, 
                       Path localFilename, 
                       int reduce,
-                      Pingable pingee) throws IOException {
+                      Progressable pingee) throws IOException {
     URL path = new URL(toString() + "&reduce=" + reduce);
     InputStream input = path.openConnection().getInputStream();
     OutputStream output = fileSys.create(localFilename);
@@ -122,7 +115,7 @@
         totalBytes += len;
         output.write(buffer, 0 ,len);
         if (pingee != null) {
-          pingee.ping();
+          pingee.progress();
         }
         len = input.read(buffer);
       }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java Wed Jun 28 14:05:38 2006
@@ -19,19 +19,22 @@
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.Progressable;
 
 /** An output data format.  Output files are stored in a {@link
  * FileSystem}. */
 public interface OutputFormat {
 
-  /** Construct a {@link RecordWriter}.
+  /** Construct a {@link RecordWriter} with Progressable.
    *
    * @param fs the file system to write to
    * @param job the job whose output is being written
    * @param name the unique name for this part of the output
+   * @param progress mechanism for reporting progress while writing to file
    * @return a {@link RecordWriter}
    */
-  RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name)
+  RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name,
+          Progressable progress)
     throws IOException;
 
   /** Check whether the output specification for a job is appropriate.  Called

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java Wed Jun 28 14:05:38 2006
@@ -20,11 +20,13 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
 
 /** A base class for {@link OutputFormat}. */
 public abstract class OutputFormatBase implements OutputFormat {
   public abstract RecordWriter getRecordWriter(FileSystem fs,
-                                               JobConf job, String name)
+                                               JobConf job, String name,
+                                               Progressable progress)
     throws IOException;
 
   public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Jun 28 14:05:38 2006
@@ -223,10 +223,12 @@
 
     sortPhase.complete();                         // sort is complete
 
+    Reporter reporter = getReporter(umbilical, getProgress());
+    
     // make output collector
     String name = getOutputName(getPartition());
     final RecordWriter out =
-      job.getOutputFormat().getRecordWriter(FileSystem.get(job), job, name);
+      job.getOutputFormat().getRecordWriter(FileSystem.get(job), job, name, reporter);
     OutputCollector collector = new OutputCollector() {
         public void collect(WritableComparable key, Writable value)
           throws IOException {
@@ -237,7 +239,6 @@
     
     // apply reduce function
     SequenceFile.Reader in = new SequenceFile.Reader(lfs, sortedFile, job);
-    Reporter reporter = getReporter(umbilical, getProgress());
     long length = lfs.getLength(sortedFile);
     try {
       ValuesIterator values = new ValuesIterator(in, length, comparator,

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Wed Jun 28 14:05:38 2006
@@ -22,6 +22,7 @@
 import java.io.*;
 import java.util.*;
 import java.text.DecimalFormat;
+import org.apache.hadoop.util.Progressable;
 
 /** Runs a reduce task. */
 class ReduceTaskRunner extends TaskRunner {
@@ -115,7 +116,7 @@
     public MapOutputLocation getLocation() { return loc; }
   }
 
-  private static class PingTimer implements MapOutputLocation.Pingable {
+  private static class PingTimer implements Progressable {
     private long pingTime;
     
     public synchronized void reset() {
@@ -126,7 +127,7 @@
       return pingTime;
     }
     
-    public void ping() {
+    public void progress() {
       synchronized (this) {
         pingTime = System.currentTimeMillis();
       }
@@ -202,7 +203,7 @@
 
           try {
             start(loc);
-            pingTimer.ping();
+            pingTimer.progress();
             size = copyOutput(loc, pingTimer);
             pingTimer.reset();
           } catch (IOException e) {
@@ -222,7 +223,7 @@
      * @throws IOException if there is an error copying the file
      */
     private long copyOutput(MapOutputLocation loc, 
-                            MapOutputLocation.Pingable pingee)
+                            Progressable pingee)
     throws IOException {
 
       String reduceId = reduceTask.getTaskId();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Wed Jun 28 14:05:38 2006
@@ -17,9 +17,10 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import org.apache.hadoop.util.Progressable;
 
 /** Passed to application code to permit alteration of status. */
-public interface Reporter {
+public interface Reporter extends Progressable {
   /** Alter the application's status description.
    *
    * @param status a brief description of the current status

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Wed Jun 28 14:05:38 2006
@@ -27,12 +27,14 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
 
 /** An {@link OutputFormat} that writes {@link SequenceFile}s. */
 public class SequenceFileOutputFormat extends OutputFormatBase {
 
   public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
-                                      String name) throws IOException {
+                                      String name, Progressable progress)
+                                      throws IOException {
 
     Path file = new Path(job.getOutputPath(), name);
 
@@ -40,7 +42,8 @@
       new SequenceFile.Writer(fs, file,
                               job.getOutputKeyClass(),
                               job.getOutputValueClass(),
-                              job.getBoolean("mapred.output.compress", false));
+                              job.getBoolean("mapred.output.compress", false),
+                              progress);
 
     return new RecordWriter() {
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Jun 28 14:05:38 2006
@@ -96,7 +96,10 @@
     return new Reporter() {
         public void setStatus(String status) throws IOException {
           progress.setStatus(status);
-          reportProgress(umbilical);
+          progress();
+        }
+        public void progress() throws IOException {
+            reportProgress(umbilical);
         }
       };
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java Wed Jun 28 14:05:38 2006
@@ -24,16 +24,17 @@
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
 
 /** An {@link OutputFormat} that writes plain text files. */
 public class TextOutputFormat extends OutputFormatBase {
 
   public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
-                                      String name) throws IOException {
+                                      String name, Progressable progress) throws IOException {
 
     Path file = new Path(job.getOutputPath(), name);
 
-    final FSDataOutputStream out = fs.create(file);
+    final FSDataOutputStream out = fs.create(file, progress);
 
     return new RecordWriter() {
         public synchronized void write(WritableComparable key, Writable value)

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java?rev=417874&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java Wed Jun 28 14:05:38 2006
@@ -0,0 +1,16 @@
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+
+
+
+/**
+ * An interface for callbacks when an method makes some progress.
+ * @author Owen O'Malley
+ */
+public interface Progressable {
+    /** callback for reporting progress. Used by DFSclient to report
+     * progress while writing a block of DFS file.
+     */
+    public void progress() throws IOException;
+}
\ No newline at end of file

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/DFSCIOTest.java Wed Jun 28 14:05:38 2006
@@ -381,6 +381,7 @@
     for( int i=0; i < nrFiles; i++)
       ioer.doIO(new Reporter() {
           public void setStatus(String status) throws IOException {}
+          public void progress() throws IOException {}
         },
                 BASE_FILE_NAME+Integer.toString(i), 
                 MEGA*fileSize );

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSIO.java Wed Jun 28 14:05:38 2006
@@ -288,6 +288,7 @@
     for( int i=0; i < nrFiles; i++)
       ioer.doIO(new Reporter() {
           public void setStatus(String status) throws IOException {}
+          public void progress() throws IOException {}
         },
                 BASE_FILE_NAME+Integer.toString(i), 
                 MEGA*fileSize );

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java Wed Jun 28 14:05:38 2006
@@ -40,6 +40,7 @@
     
     Reporter reporter = new Reporter() {
         public void setStatus(String status) throws IOException {}
+        public void progress() throws IOException {}
       };
     
     int seed = new Random().nextInt();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Wed Jun 28 14:05:38 2006
@@ -40,6 +40,7 @@
 
     Reporter reporter = new Reporter() {
         public void setStatus(String status) throws IOException {}
+        public void progress() throws IOException {}
       };
     
     int seed = new Random().nextInt();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java?rev=417874&r1=417873&r2=417874&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java Wed Jun 28 14:05:38 2006
@@ -46,6 +46,7 @@
     
     Reporter reporter = new Reporter() {
         public void setStatus(String status) throws IOException {}
+        public void progress() throws IOException {}
       };
     
     int seed = new Random().nextInt();