You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/16 23:44:46 UTC

svn commit: r529410 [16/27] - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/...

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java Mon Apr 16 14:44:35 2007
@@ -47,7 +47,7 @@
   }
 
   public KeyValueLineRecordReader(Configuration job, FileSplit split)
-      throws IOException {
+    throws IOException {
     super(job, split);
     String sepStr = job.get("key.value.separator.in.input.line", "\t");
     this.separator = (byte) sepStr.charAt(0);
@@ -64,7 +64,7 @@
 
   /** Read key/value pair in a line. */
   public synchronized boolean next(Writable key, Writable value)
-      throws IOException {
+    throws IOException {
     Text tKey = (Text) key;
     Text tValue = (Text) value;
     byte[] line = null;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Apr 16 14:44:35 2007
@@ -278,7 +278,7 @@
 
   public JobStatus[] jobsToComplete() {return null;}
   public TaskCompletionEvent[] getTaskCompletionEvents(
-      String jobid, int fromEventId, int maxEvents) throws IOException{
+                                                       String jobid, int fromEventId, int maxEvents) throws IOException{
     return TaskCompletionEvent.EMPTY_ARRAY;
   }
   

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java Mon Apr 16 14:44:35 2007
@@ -23,28 +23,28 @@
  * @author Mike Cafarella
  *******************************/
 interface MRConstants {
-    //
-    // Timeouts, constants
-    //
-    public static final long HEARTBEAT_INTERVAL = 10 * 1000;
-    public static final long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
+  //
+  // Timeouts, constants
+  //
+  public static final long HEARTBEAT_INTERVAL = 10 * 1000;
+  public static final long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
 
-    //for the inmemory filesystem (to do in-memory merge)
-    /**
-     * Constant denoting when a merge of in memory files will be triggered 
-     */
-    public static final float MAX_INMEM_FILESYS_USE = 0.5f;
-    /**
-     * Constant denoting the max size (in terms of the fraction of the total 
-     * size of the filesys) of a map output file that we will try
-     * to keep in mem. Ideally, this should be a factor of MAX_INMEM_FILESYS_USE
-     */
-    public static final float MAX_INMEM_FILESIZE_FRACTION =
-      MAX_INMEM_FILESYS_USE/2;
+  //for the inmemory filesystem (to do in-memory merge)
+  /**
+   * Constant denoting when a merge of in memory files will be triggered 
+   */
+  public static final float MAX_INMEM_FILESYS_USE = 0.5f;
+  /**
+   * Constant denoting the max size (in terms of the fraction of the total 
+   * size of the filesys) of a map output file that we will try
+   * to keep in mem. Ideally, this should be a factor of MAX_INMEM_FILESYS_USE
+   */
+  public static final float MAX_INMEM_FILESIZE_FRACTION =
+    MAX_INMEM_FILESYS_USE/2;
     
-    //
-    // Result codes
-    //
-    public static int SUCCESS = 0;
-    public static int FILE_NOT_FOUND = -1;
+  //
+  // Result codes
+  //
+  public static int SUCCESS = 0;
+  public static int FILE_NOT_FOUND = -1;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Apr 16 14:44:35 2007
@@ -77,7 +77,7 @@
   }
 
   public boolean isMapTask() {
-      return true;
+    return true;
   }
 
   public void localizeConfiguration(JobConf conf) throws IOException {
@@ -118,7 +118,7 @@
     InputSplit split;
     try {
       split = (InputSplit) 
-         ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
+        ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
     } catch (ClassNotFoundException exp) {
       IOException wrap = new IOException("Split class " + splitClass + 
                                          " not found");
@@ -198,23 +198,23 @@
   private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
     //spawn a thread to give merge progress heartbeats
     Thread sortProgress = new Thread() {
-      public void run() {
-        LOG.debug("Started thread: " + getName());
-        while (true) {
-          try {
-            reportProgress(umbilical);
-            Thread.sleep(PROGRESS_INTERVAL);
-          } catch (InterruptedException e) {
+        public void run() {
+          LOG.debug("Started thread: " + getName());
+          while (true) {
+            try {
+              reportProgress(umbilical);
+              Thread.sleep(PROGRESS_INTERVAL);
+            } catch (InterruptedException e) {
               return;
-          } catch (Throwable e) {
+            } catch (Throwable e) {
               LOG.info("Thread Exception in " +
-                                 "reporting sort progress\n" +
-                                 StringUtils.stringifyException(e));
+                       "reporting sort progress\n" +
+                       StringUtils.stringifyException(e));
               continue;
+            }
           }
         }
-      }
-    };
+      };
     sortProgress.setName("Sort progress reporter for task "+getTaskId());
     sortProgress.setDaemon(true);
     return sortProgress;
@@ -260,10 +260,10 @@
     private FSDataOutputStream indexOut;
     private long segmentStart;
     public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, 
-            Reporter reporter) throws IOException {
+                           Reporter reporter) throws IOException {
       this.partitions = job.getNumReduceTasks();
       this.partitioner = (Partitioner)ReflectionUtils.newInstance(
-                                      job.getPartitionerClass(), job);
+                                                                  job.getPartitionerClass(), job);
       maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024;
       keyValBuffer = new DataOutputBuffer();
 
@@ -284,21 +284,21 @@
         Class codecClass = 
           job.getMapOutputCompressorClass(DefaultCodec.class);
         codec = (CompressionCodec) 
-                   ReflectionUtils.newInstance(codecClass, job);
+          ReflectionUtils.newInstance(codecClass, job);
       }
       sortImpl = new BufferSorter[partitions];
       for (int i = 0; i < partitions; i++)
         sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
-                   job.getClass("map.sort.class", MergeSorter.class,
-                   BufferSorter.class), job);
+                                                                job.getClass("map.sort.class", MergeSorter.class,
+                                                                             BufferSorter.class), job);
     }
     public void startPartition(int partNumber) throws IOException {
       //We create the sort output as multiple sequence files within a spilled
       //file. So we create a writer for each partition. 
       segmentStart = out.getPos();
       writer =
-          SequenceFile.createWriter(job, out, job.getMapOutputKeyClass(),
-                  job.getMapOutputValueClass(), compressionType, codec);
+        SequenceFile.createWriter(job, out, job.getMapOutputKeyClass(),
+                                  job.getMapOutputValueClass(), compressionType, codec);
     }
     private void endPartition(int partNumber) throws IOException {
       //Need to write syncs especially if block compression is in use
@@ -311,7 +311,7 @@
     }
     
     public void collect(WritableComparable key,
-              Writable value) throws IOException {
+                        Writable value) throws IOException {
       
       if (key.getClass() != keyClass) {
         throw new IOException("Type mismatch in key from map: expected "
@@ -362,7 +362,7 @@
                                                              numSpills);
         indexOut = localFs.create(indexFilename);
         LOG.debug("opened "+
-        mapOutputFile.getSpillFile(getTaskId(), numSpills).getName());
+                  mapOutputFile.getSpillFile(getTaskId(), numSpills).getName());
           
         //invoke the sort
         for (int i = 0; i < partitions; i++) {
@@ -379,16 +379,16 @@
               //got all the input key/val, processed, and output the result 
               //key/vals before we write the partition header in the output file
               Reducer combiner = (Reducer)ReflectionUtils.newInstance(
-                                         job.getCombinerClass(), job);
+                                                                      job.getCombinerClass(), job);
               // make collector
               OutputCollector combineCollector = new OutputCollector() {
-                public void collect(WritableComparable key, Writable value)
-                  throws IOException {
-                  synchronized (this) {
-                    writer.append(key, value);
+                  public void collect(WritableComparable key, Writable value)
+                    throws IOException {
+                    synchronized (this) {
+                      writer.append(key, value);
+                    }
                   }
-                }
-              };
+                };
               combineAndSpill(rIter, combiner, combineCollector);
               combiner.close();
             }
@@ -404,10 +404,10 @@
     }
     
     private void combineAndSpill(RawKeyValueIterator resultIter, 
-    Reducer combiner, OutputCollector combineCollector) throws IOException {
+                                 Reducer combiner, OutputCollector combineCollector) throws IOException {
       //combine the key/value obtained from the offset & indices arrays.
       CombineValuesIterator values = new CombineValuesIterator(resultIter,
-              comparator, keyClass, valClass, job, reporter);
+                                                               comparator, keyClass, valClass, job, reporter);
       while (values.more()) {
         combiner.reduce(values.getKey(), values, combineCollector, reporter);
         values.nextKey();
@@ -459,7 +459,7 @@
                                                    4096);
       //The final index file output stream
       FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
-                                                           4096);
+                                                        4096);
       long segmentStart;
       
       if (numSpills == 0) {
@@ -467,8 +467,8 @@
         for (int i = 0; i < partitions; i++) {
           segmentStart = finalOut.getPos();
           SequenceFile.createWriter(job, finalOut, 
-                  job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
-                  compressionType, codec);
+                                    job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
+                                    compressionType, codec);
           finalIndexOut.writeLong(segmentStart);
           finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
         }
@@ -498,15 +498,15 @@
             long segmentLength = indexIn.readLong();
             indexIn.close();
             SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
-                segmentLength, filename[i]);
+                                                               segmentLength, filename[i]);
             s.preserveInput(true);
             s.doSync();
             segmentList.add(i, s);
           }
           segmentStart = finalOut.getPos();
           SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, 
-              job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
-              compressionType, codec);
+                                                                 job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
+                                                                 compressionType, codec);
           sorter.writeFile(sorter.merge(segmentList, new Path(getTaskId())), 
                            writer);
           //add a sync block - required esp. for block compression to ensure
@@ -537,9 +537,9 @@
     private class CombineValuesIterator extends ValuesIterator {
         
       public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, 
-              WritableComparator comparator, Class keyClass,
-              Class valClass, Configuration conf, Reporter reporter) 
-      throws IOException {
+                                   WritableComparator comparator, Class keyClass,
+                                   Class valClass, Configuration conf, Reporter reporter) 
+        throws IOException {
         super(in, comparator, keyClass, valClass, conf, reporter);
       }
       

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java Mon Apr 16 14:44:35 2007
@@ -47,7 +47,7 @@
     System.arraycopy(pointers, 0, pointersCopy, 0, count);
     m.mergeSort(pointers, pointersCopy, 0, count);
     return new MRSortResultIterator(super.keyValBuffer, pointersCopy, 
-           super.startOffsets, super.keyLengths, super.valueLengths);
+                                    super.startOffsets, super.keyLengths, super.valueLengths);
   }
   /** The implementation of the compare method from Comparator. This basically
    * forwards the call to the super class's compare. Note that

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon Apr 16 14:44:35 2007
@@ -66,7 +66,7 @@
   { 
     getProgress().setStatus("reduce"); 
     setPhase(TaskStatus.Phase.SHUFFLE);        // phase to start with 
- }
+  }
 
   private Progress copyPhase = getProgress().addPhase("copy");
   private Progress sortPhase  = getProgress().addPhase("sort");
@@ -87,7 +87,7 @@
   }
 
   public boolean isMapTask() {
-      return false;
+    return false;
   }
 
   public int getNumMaps() { return numMaps; }
@@ -208,10 +208,10 @@
   }
   private class ReduceValuesIterator extends ValuesIterator {
     public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
-                               WritableComparator comparator, Class keyClass,
-                               Class valClass,
-                               Configuration conf, Reporter reporter)
-    throws IOException {
+                                 WritableComparator comparator, Class keyClass,
+                                 Class valClass,
+                                 Configuration conf, Reporter reporter)
+      throws IOException {
       super(in, comparator, keyClass, valClass, conf, reporter);
     }
     public void informReduceProgress() {
@@ -232,7 +232,7 @@
     throws IOException {
     Class valueClass = job.getMapOutputValueClass();
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
-                                  job.getReducerClass(), job);
+                                                           job.getReducerClass(), job);
     FileSystem lfs = FileSystem.getLocal(job);
 
     copyPhase.complete();                         // copy is already complete
@@ -259,12 +259,12 @@
               reportProgress(umbilical);
               Thread.sleep(PROGRESS_INTERVAL);
             } catch (InterruptedException e) {
-                return;
+              return;
             } catch (Throwable e) {
-                System.out.println("Thread Exception in " +
-                                   "reporting sort progress\n" +
-                                   StringUtils.stringifyException(e));
-                continue;
+              System.out.println("Thread Exception in " +
+                                 "reporting sort progress\n" +
+                                 StringUtils.stringifyException(e));
+              continue;
             }
           }
         }
@@ -285,7 +285,7 @@
       SequenceFile.Sorter sorter =
         new SequenceFile.Sorter(lfs, comparator, valueClass, job);
       rIter = sorter.merge(mapFiles, tempDir, 
-                                    !conf.getKeepFailedTaskFiles()); // sort
+                           !conf.getKeepFailedTaskFiles()); // sort
 
     } finally {
       sortComplete = true;
@@ -302,8 +302,8 @@
     FileSystem fs = FileSystem.get(job) ;
 
     if( runSpeculative ){
-        fs = new PhasedFileSystem (fs , 
-                      getJobId(), getTipId(), getTaskId());
+      fs = new PhasedFileSystem (fs , 
+                                 getJobId(), getTipId(), getTaskId());
     }
     
     final RecordWriter out = 
@@ -323,7 +323,7 @@
       Class keyClass = job.getMapOutputKeyClass();
       Class valClass = job.getMapOutputValueClass();
       ReduceValuesIterator values = new ReduceValuesIterator(rIter, comparator, 
-                                  keyClass, valClass, job, reporter);
+                                                             keyClass, valClass, job, reporter);
       values.informReduceProgress();
       while (values.more()) {
         reporter.incrCounter(REDUCE_INPUT_GROUPS, 1);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Mon Apr 16 14:44:35 2007
@@ -274,7 +274,7 @@
             size = copyOutput(loc);
           } catch (IOException e) {
             LOG.warn(reduceTask.getTaskId() + " copy failed: " +
-                        loc.getMapTaskId() + " from " + loc.getHost());
+                     loc.getMapTaskId() + " from " + loc.getHost());
             LOG.warn(StringUtils.stringifyException(e));
           } finally {
             finish(size);
@@ -309,8 +309,8 @@
       Path tmpFilename = new Path(finalFilename + "-" + id);
       // this copies the map output file
       tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
-                               tmpFilename, reduceTask.getPartition(),
-                               STALLED_COPY_TIMEOUT);
+                                tmpFilename, reduceTask.getPartition(),
+                                STALLED_COPY_TIMEOUT);
       if (!neededOutputs.contains(loc.getMapId())) {
         if (tmpFilename != null) {
           FileSystem fs = tmpFilename.getFileSystem(conf);
@@ -352,7 +352,7 @@
                    " is " + inMemFileSys.getPercentUsed() + 
                    " full. Triggering merge");
           InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
-                                     (LocalFileSystem)localFileSys, sorter);
+                                                        (LocalFileSystem)localFileSys, sorter);
           m.setName("Thread for merging in memory files");
           m.setDaemon(true);
           mergeInProgress = true;
@@ -425,7 +425,7 @@
     //create an instance of the sorter
     sorter =
       new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(), 
-          conf.getMapOutputValueClass(), conf);
+                              conf.getMapOutputValueClass(), conf);
     
     // hosts -> next contact time
     this.penaltyBox = new Hashtable();
@@ -437,7 +437,7 @@
 
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
     this.shuffleMetrics = 
-        MetricsUtil.createRecord(metricsContext, "shuffleInput");
+      MetricsUtil.createRecord(metricsContext, "shuffleInput");
     this.shuffleMetrics.setTag("user", conf.getUser());
   }
 
@@ -452,7 +452,7 @@
     
     final int      numOutputs = reduceTask.getNumMaps();
     Map<Integer, MapOutputLocation> knownOutputs = 
-                                    new HashMap<Integer, MapOutputLocation>();
+      new HashMap<Integer, MapOutputLocation>();
     int            numInFlight = 0, numCopied = 0;
     int            lowThreshold = numCopiers*2;
     long           bytesTransferred = 0;
@@ -488,229 +488,229 @@
     pingTimer.setDaemon(true);
     pingTimer.start();
     try {
-    // loop until we get all required outputs or are killed
-    while (!killed && numCopied < numOutputs && mergeThrowable == null) {
+      // loop until we get all required outputs or are killed
+      while (!killed && numCopied < numOutputs && mergeThrowable == null) {
 
-      LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
-               " map output(s)");
+        LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
+                 " map output(s)");
 
-      if (!neededOutputs.isEmpty()) {
-        LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
-                 " map output location(s)");
-        try {
-          // Put the hash entries for the failed fetches. Entries here
-          // might be replaced by (mapId) hashkeys from new successful 
-          // Map executions, if the fetch failures were due to lost tasks.
-          // The replacements, if at all, will happen when we query the
-          // JobTracker and put the mapId hashkeys with new MapOutputLocations
-          // as values
-          knownOutputs.putAll(retryFetches);
-          // the call to queryJobTracker will modify fromEventId to a value
-          // that it should be for the next call to queryJobTracker
-          List <MapOutputLocation> locs = queryJobTracker(fromEventId, 
-                                                          jobClient);
+        if (!neededOutputs.isEmpty()) {
+          LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
+                   " map output location(s)");
+          try {
+            // Put the hash entries for the failed fetches. Entries here
+            // might be replaced by (mapId) hashkeys from new successful 
+            // Map executions, if the fetch failures were due to lost tasks.
+            // The replacements, if at all, will happen when we query the
+            // JobTracker and put the mapId hashkeys with new MapOutputLocations
+            // as values
+            knownOutputs.putAll(retryFetches);
+            // the call to queryJobTracker will modify fromEventId to a value
+            // that it should be for the next call to queryJobTracker
+            List <MapOutputLocation> locs = queryJobTracker(fromEventId, 
+                                                            jobClient);
           
-          // put discovered them on the known list
-          for (int i=0; i < locs.size(); i++) {
-            knownOutputs.put(new Integer(locs.get(i).getMapId()), locs.get(i));
+            // put discovered them on the known list
+            for (int i=0; i < locs.size(); i++) {
+              knownOutputs.put(new Integer(locs.get(i).getMapId()), locs.get(i));
+            }
+            LOG.info(reduceTask.getTaskId() +
+                     " Got " + locs.size() + 
+                     " new map outputs from jobtracker and " + retryFetches.size() +
+                     " map outputs from previous failures");
+            // clear the "failed" fetches hashmap
+            retryFetches.clear();
+          }
+          catch (IOException ie) {
+            LOG.warn(reduceTask.getTaskId() +
+                     " Problem locating map outputs: " +
+                     StringUtils.stringifyException(ie));
           }
-          LOG.info(reduceTask.getTaskId() +
-                " Got " + locs.size() + 
-                " new map outputs from jobtracker and " + retryFetches.size() +
-                " map outputs from previous failures");
-          // clear the "failed" fetches hashmap
-          retryFetches.clear();
-        }
-        catch (IOException ie) {
-          LOG.warn(reduceTask.getTaskId() +
-                      " Problem locating map outputs: " +
-                      StringUtils.stringifyException(ie));
         }
-      }
       
-      // now walk through the cache and schedule what we can
-      int numKnown = knownOutputs.size(), numScheduled = 0;
-      int numSlow = 0, numDups = 0;
-
-      LOG.info(reduceTask.getTaskId() + " Got " + numKnown + 
-               " known map output location(s); scheduling...");
-
-      synchronized (scheduledCopies) {
-        Iterator locIt = knownOutputs.values().iterator();
-
-        currentTime = System.currentTimeMillis();
-        while (locIt.hasNext()) {
-
-          MapOutputLocation loc = (MapOutputLocation)locIt.next();
-          Long penaltyEnd = (Long)penaltyBox.get(loc.getHost());
-          boolean penalized = false, duplicate = false;
+        // now walk through the cache and schedule what we can
+        int numKnown = knownOutputs.size(), numScheduled = 0;
+        int numSlow = 0, numDups = 0;
+
+        LOG.info(reduceTask.getTaskId() + " Got " + numKnown + 
+                 " known map output location(s); scheduling...");
+
+        synchronized (scheduledCopies) {
+          Iterator locIt = knownOutputs.values().iterator();
+
+          currentTime = System.currentTimeMillis();
+          while (locIt.hasNext()) {
+
+            MapOutputLocation loc = (MapOutputLocation)locIt.next();
+            Long penaltyEnd = (Long)penaltyBox.get(loc.getHost());
+            boolean penalized = false, duplicate = false;
  
-          if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
-            penalized = true; numSlow++;
-          }
-          if (uniqueHosts.contains(loc.getHost())) {
-            duplicate = true; numDups++;
-          }
+            if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) {
+              penalized = true; numSlow++;
+            }
+            if (uniqueHosts.contains(loc.getHost())) {
+              duplicate = true; numDups++;
+            }
  
-          if (!penalized && !duplicate) {
-            uniqueHosts.add(loc.getHost());
-            scheduledCopies.add(loc);
-            locIt.remove();  // remove from knownOutputs
-            numInFlight++; numScheduled++;
+            if (!penalized && !duplicate) {
+              uniqueHosts.add(loc.getHost());
+              scheduledCopies.add(loc);
+              locIt.remove();  // remove from knownOutputs
+              numInFlight++; numScheduled++;
+            }
           }
+          scheduledCopies.notifyAll();
         }
-        scheduledCopies.notifyAll();
-      }
-      LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +
-               " of " + numKnown + " known outputs (" + numSlow +
-               " slow hosts and " + numDups + " dup hosts)");
+        LOG.info(reduceTask.getTaskId() + " Scheduled " + numScheduled +
+                 " of " + numKnown + " known outputs (" + numSlow +
+                 " slow hosts and " + numDups + " dup hosts)");
 
-      // if we have no copies in flight and we can't schedule anything
-      // new, just wait for a bit
-      try {
-        if (numInFlight == 0 && numScheduled == 0) {
-          Thread.sleep(5000);
-        }
-      } catch (InterruptedException e) { } // IGNORE
+        // if we have no copies in flight and we can't schedule anything
+        // new, just wait for a bit
+        try {
+          if (numInFlight == 0 && numScheduled == 0) {
+            Thread.sleep(5000);
+          }
+        } catch (InterruptedException e) { } // IGNORE
 
-      while (!killed && numInFlight > 0 && mergeThrowable == null) {
-        LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight);
-        CopyResult cr = getCopyResult();
+        while (!killed && numInFlight > 0 && mergeThrowable == null) {
+          LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight);
+          CopyResult cr = getCopyResult();
         
-        if (cr != null) {
-          if (cr.getSuccess()) {  // a successful copy
-            numCopied++;
-            bytesTransferred += cr.getSize();
+          if (cr != null) {
+            if (cr.getSuccess()) {  // a successful copy
+              numCopied++;
+              bytesTransferred += cr.getSize();
           
-            long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;
-            float mbs = ((float)bytesTransferred)/(1024*1024);
-            float transferRate = mbs/secsSinceStart;
+              long secsSinceStart = (System.currentTimeMillis()-startTime)/1000+1;
+              float mbs = ((float)bytesTransferred)/(1024*1024);
+              float transferRate = mbs/secsSinceStart;
           
-            copyPhase.startNextPhase();
-            copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + 
-                                " at " +
-                                mbpsFormat.format(transferRate) +  " MB/s)");          
-          } else if (cr.isObsolete()) {
-            //ignore
-            LOG.info(reduceTask.getTaskId() + 
-                " Ignoring obsolete copy result for Map Task: " + 
-                cr.getLocation().getMapTaskId() + " from host: " + 
-                cr.getHost());
-          } else {
-            retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
+              copyPhase.startNextPhase();
+              copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + 
+                                  " at " +
+                                  mbpsFormat.format(transferRate) +  " MB/s)");          
+            } else if (cr.isObsolete()) {
+              //ignore
+              LOG.info(reduceTask.getTaskId() + 
+                       " Ignoring obsolete copy result for Map Task: " + 
+                       cr.getLocation().getMapTaskId() + " from host: " + 
+                       cr.getHost());
+            } else {
+              retryFetches.put(new Integer(cr.getMapId()), cr.getLocation());
           
-            // wait a random amount of time for next contact
-            currentTime = System.currentTimeMillis();
-            long nextContact = currentTime + 60 * 1000 +
-                               backoff.nextInt(maxBackoff*1000);
-            penaltyBox.put(cr.getHost(), new Long(nextContact));          
-            LOG.warn(reduceTask.getTaskId() + " adding host " +
-                     cr.getHost() + " to penalty box, next contact in " +
-                     ((nextContact-currentTime)/1000) + " seconds");
-
-            // other outputs from the failed host may be present in the
-            // knownOutputs cache, purge them. This is important in case
-            // the failure is due to a lost tasktracker (causes many
-            // unnecessary backoffs). If not, we only take a small hit
-            // polling the jobtracker a few more times
-            Iterator locIt = knownOutputs.values().iterator();
-            while (locIt.hasNext()) {
-              MapOutputLocation loc = (MapOutputLocation)locIt.next();
-              if (cr.getHost().equals(loc.getHost())) {
-                retryFetches.put(new Integer(loc.getMapId()), loc);
-                locIt.remove();
+              // wait a random amount of time for next contact
+              currentTime = System.currentTimeMillis();
+              long nextContact = currentTime + 60 * 1000 +
+                backoff.nextInt(maxBackoff*1000);
+              penaltyBox.put(cr.getHost(), new Long(nextContact));          
+              LOG.warn(reduceTask.getTaskId() + " adding host " +
+                       cr.getHost() + " to penalty box, next contact in " +
+                       ((nextContact-currentTime)/1000) + " seconds");
+
+              // other outputs from the failed host may be present in the
+              // knownOutputs cache, purge them. This is important in case
+              // the failure is due to a lost tasktracker (causes many
+              // unnecessary backoffs). If not, we only take a small hit
+              // polling the jobtracker a few more times
+              Iterator locIt = knownOutputs.values().iterator();
+              while (locIt.hasNext()) {
+                MapOutputLocation loc = (MapOutputLocation)locIt.next();
+                if (cr.getHost().equals(loc.getHost())) {
+                  retryFetches.put(new Integer(loc.getMapId()), loc);
+                  locIt.remove();
+                }
               }
             }
+            uniqueHosts.remove(cr.getHost());
+            numInFlight--;
           }
-          uniqueHosts.remove(cr.getHost());
-          numInFlight--;
-        }
         
-        boolean busy = true;
-        // ensure we have enough to keep us busy
-        if (numInFlight < lowThreshold && (numOutputs-numCopied) > probe_sample_size) {
-          busy = false;
-        }
-        //Check whether we have more CopyResult to check. If there is none, and
-        //we are not busy enough, break
-        synchronized (copyResults) {
-          if (copyResults.size() == 0 && !busy) {
-            break;
+          boolean busy = true;
+          // ensure we have enough to keep us busy
+          if (numInFlight < lowThreshold && (numOutputs-numCopied) > probe_sample_size) {
+            busy = false;
+          }
+          //Check whether we have more CopyResult to check. If there is none, and
+          //we are not busy enough, break
+          synchronized (copyResults) {
+            if (copyResults.size() == 0 && !busy) {
+              break;
+            }
           }
         }
-      }
       
-    }
+      }
 
-    // all done, inform the copiers to exit
-    synchronized (copiers) {
-      synchronized (scheduledCopies) {
-        for (int i=0; i < copiers.length; i++) {
-          copiers[i].interrupt();
-          copiers[i] = null;
+      // all done, inform the copiers to exit
+      synchronized (copiers) {
+        synchronized (scheduledCopies) {
+          for (int i=0; i < copiers.length; i++) {
+            copiers[i].interrupt();
+            copiers[i] = null;
+          }
         }
       }
-    }
     
-    //Do a merge of in-memory files (if there are any)
-    if (!killed && mergeThrowable == null) {
-      try {
-        //wait for an ongoing merge (if it is in flight) to complete
-        while (mergeInProgress) {
-          Thread.sleep(200);
-        }
-        LOG.info(reduceTask.getTaskId() + 
-                 " Copying of all map outputs complete. " + 
-                 "Initiating the last merge on the remaining files in " + 
-                 inMemFileSys.getUri());
-        if (mergeThrowable != null) {
-          //this could happen if the merge that
-          //was in progress threw an exception
-          throw mergeThrowable;
-        }
-        //initiate merge
-        Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
-        if (inMemClosedFiles.length == 0) {
-          LOG.info(reduceTask.getTaskId() + "Nothing to merge from " + 
-              inMemFileSys.getUri());
-          return numCopied == numOutputs;
-        }
-        //name this output file same as the name of the first file that is 
-        //there in the current list of inmem files (this is guaranteed to be
-        //absent on the disk currently. So we don't overwrite a prev. 
-        //created spill). Also we need to create the output file now since
-        //it is not guaranteed that this file will be present after merge
-        //is called (we delete empty sequence files as soon as we see them
-        //in the merge method)
-        SequenceFile.Writer writer = sorter.cloneFileAttributes(
-            inMemFileSys.makeQualified(inMemClosedFiles[0]), 
-            localFileSys.makeQualified(inMemClosedFiles[0]), null);
-        
-        RawKeyValueIterator rIter = null;
+      //Do a merge of in-memory files (if there are any)
+      if (!killed && mergeThrowable == null) {
         try {
-          rIter = sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length, 
-                       new Path(reduceTask.getTaskId()));
-        } catch (Exception e) { 
-          //make sure that we delete the ondisk file that we created earlier
-          //when we invoked cloneFileAttributes
+          //wait for an ongoing merge (if it is in flight) to complete
+          while (mergeInProgress) {
+            Thread.sleep(200);
+          }
+          LOG.info(reduceTask.getTaskId() + 
+                   " Copying of all map outputs complete. " + 
+                   "Initiating the last merge on the remaining files in " + 
+                   inMemFileSys.getUri());
+          if (mergeThrowable != null) {
+            //this could happen if the merge that
+            //was in progress threw an exception
+            throw mergeThrowable;
+          }
+          //initiate merge
+          Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
+          if (inMemClosedFiles.length == 0) {
+            LOG.info(reduceTask.getTaskId() + "Nothing to merge from " + 
+                     inMemFileSys.getUri());
+            return numCopied == numOutputs;
+          }
+          //name this output file same as the name of the first file that is 
+          //there in the current list of inmem files (this is guaranteed to be
+          //absent on the disk currently. So we don't overwrite a prev. 
+          //created spill). Also we need to create the output file now since
+          //it is not guaranteed that this file will be present after merge
+          //is called (we delete empty sequence files as soon as we see them
+          //in the merge method)
+          SequenceFile.Writer writer = sorter.cloneFileAttributes(
+                                                                  inMemFileSys.makeQualified(inMemClosedFiles[0]), 
+                                                                  localFileSys.makeQualified(inMemClosedFiles[0]), null);
+        
+          RawKeyValueIterator rIter = null;
+          try {
+            rIter = sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length, 
+                                 new Path(reduceTask.getTaskId()));
+          } catch (Exception e) { 
+            //make sure that we delete the ondisk file that we created earlier
+            //when we invoked cloneFileAttributes
+            writer.close();
+            localFileSys.delete(inMemClosedFiles[0]);
+            throw new IOException (StringUtils.stringifyException(e));
+          }
+          sorter.writeFile(rIter, writer);
           writer.close();
-          localFileSys.delete(inMemClosedFiles[0]);
-          throw new IOException (StringUtils.stringifyException(e));
+          LOG.info(reduceTask.getTaskId() +
+                   " Merge of the " +inMemClosedFiles.length +
+                   " files in InMemoryFileSystem complete." +
+                   " Local file is " + inMemClosedFiles[0]);
+        } catch (Throwable t) {
+          LOG.warn(reduceTask.getTaskId() +
+                   " Final merge of the inmemory files threw an exception: " + 
+                   StringUtils.stringifyException(t));
+          return false;
         }
-        sorter.writeFile(rIter, writer);
-        writer.close();
-        LOG.info(reduceTask.getTaskId() +
-                 " Merge of the " +inMemClosedFiles.length +
-                 " files in InMemoryFileSystem complete." +
-                 " Local file is " + inMemClosedFiles[0]);
-      } catch (Throwable t) {
-        LOG.warn(reduceTask.getTaskId() +
-            " Final merge of the inmemory files threw an exception: " + 
-            StringUtils.stringifyException(t));
-        return false;
       }
-    }
-    return mergeThrowable == null && numCopied == numOutputs && !killed;
+      return mergeThrowable == null && numCopied == numOutputs && !killed;
     } finally {
       inMemFileSys.close();
       pingTimer.interrupt();
@@ -719,7 +719,7 @@
   
   
   private CopyResult getCopyResult() {  
-   synchronized (copyResults) {
+    synchronized (copyResults) {
       while (!killed && copyResults.isEmpty()) {
         try {
           copyResults.wait();
@@ -741,8 +741,8 @@
    * @throws IOException
    */  
   private List <MapOutputLocation> queryJobTracker(IntWritable fromEventId, 
-                                              InterTrackerProtocol jobClient)
-  throws IOException {
+                                                   InterTrackerProtocol jobClient)
+    throws IOException {
     
     long currentTime = System.currentTimeMillis();    
     long pollTime = lastPollTime + MIN_POLL_INTERVAL;
@@ -755,9 +755,9 @@
     lastPollTime = currentTime;
 
     TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
-                                      reduceTask.getJobId().toString(),
-                                      fromEventId.get(),
-                                      probe_sample_size);
+                                                                reduceTask.getJobId().toString(),
+                                                                fromEventId.get(),
+                                                                probe_sample_size);
     
     List <MapOutputLocation> mapOutputsList = new ArrayList();
     for (int i = 0; i < t.length; i++) {
@@ -799,7 +799,7 @@
     private SequenceFile.Sorter sorter;
     
     public InMemFSMergeThread(InMemoryFileSystem inMemFileSys, 
-        LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
+                              LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
       this.inMemFileSys = inMemFileSys;
       this.localFileSys = localFileSys;
       this.sorter = sorter;
@@ -813,7 +813,7 @@
         //in flight. So we make sure that we have some 'closed' map
         //output files to merge to get the benefit of in-memory merge
         if (inMemClosedFiles.length >= 
-          (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
+            (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
           //name this output file same as the name of the first file that is 
           //there in the current list of inmem files (this is guaranteed to be
           //absent on the disk currently. So we don't overwrite a prev. 
@@ -822,12 +822,12 @@
           //is called (we delete empty sequence files as soon as we see them
           //in the merge method)
           SequenceFile.Writer writer = sorter.cloneFileAttributes(
-              inMemFileSys.makeQualified(inMemClosedFiles[0]), 
-              localFileSys.makeQualified(inMemClosedFiles[0]), null);
+                                                                  inMemFileSys.makeQualified(inMemClosedFiles[0]), 
+                                                                  localFileSys.makeQualified(inMemClosedFiles[0]), null);
           RawKeyValueIterator rIter;
           try {
             rIter = sorter.merge(inMemClosedFiles, true, 
-              inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
+                                 inMemClosedFiles.length, new Path(reduceTask.getTaskId()));
           } catch (Exception e) { 
             //make sure that we delete the ondisk file that we created earlier
             //when we invoked cloneFileAttributes
@@ -844,12 +844,12 @@
         }
         else {
           LOG.info(reduceTask.getTaskId() + " Nothing to merge from " + 
-              inMemFileSys.getUri());
+                   inMemFileSys.getUri());
         }
       } catch (Throwable t) {
         LOG.warn(reduceTask.getTaskId() +
-            " Intermediate Merge of the inmemory files threw an exception: " + 
-            StringUtils.stringifyException(t));
+                 " Intermediate Merge of the inmemory files threw an exception: " + 
+                 StringUtils.stringifyException(t));
         ReduceTaskRunner.this.mergeThrowable = t;
       }
       finally {
@@ -858,8 +858,8 @@
     }
   }
   final private static PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
-    public boolean accept(Path file) {
-      return file.toString().endsWith(".out");
-    }     
-  };
+      public boolean accept(Path file) {
+        return file.toString().endsWith(".out");
+      }     
+    };
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Mon Apr 16 14:44:35 2007
@@ -29,13 +29,13 @@
    * A constant of Reporter type that does nothing.
    */
   public static final Reporter NULL = new Reporter() {
-    public void setStatus(String s) {
-    }
-    public void progress() throws IOException {
-    }
-    public void incrCounter(Enum key, long amount) {
-    }
-  };
+      public void setStatus(String s) {
+      }
+      public void progress() throws IOException {
+      }
+      public void incrCounter(Enum key, long amount) {
+      }
+    };
 
   /**
    * Alter the application's status description.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Mon Apr 16 14:44:35 2007
@@ -27,61 +27,61 @@
  * @author Mike Cafarella
  */
 public interface RunningJob {
-    /**
-     * Returns an identifier for the job
-     */
-    public String getJobID();
-
-    /**
-     * Returns the path of the submitted job.
-     */
-    public String getJobFile();
-
-    /**
-     * Returns a URL where some job progress information will be displayed.
-     */
-    public String getTrackingURL();
-
-    /**
-     * Returns a float between 0.0 and 1.0, indicating progress on
-     * the map portion of the job.  When all map tasks have completed,
-     * the function returns 1.0.
-     */
-    public float mapProgress() throws IOException;
-
-    /**
-     * Returns a float between 0.0 and 1.0, indicating progress on
-     * the reduce portion of the job.  When all reduce tasks have completed,
-     * the function returns 1.0.
-     */
-    public float reduceProgress() throws IOException;
-
-    /**
-     * Non-blocking function to check whether the job is finished or not.
-     */
-    public boolean isComplete() throws IOException;
-
-    /**
-     * True iff job completed successfully.
-     */
-    public boolean isSuccessful() throws IOException;
-
-    /**
-     * Blocks until the job is complete.
-     */
-    public void waitForCompletion() throws IOException;
-
-    /**
-     * Kill the running job.  Blocks until all job tasks have been
-     * killed as well.  If the job is no longer running, it simply returns.
-     */
-    public void killJob() throws IOException;
+  /**
+   * Returns an identifier for the job
+   */
+  public String getJobID();
+
+  /**
+   * Returns the path of the submitted job.
+   */
+  public String getJobFile();
+
+  /**
+   * Returns a URL where some job progress information will be displayed.
+   */
+  public String getTrackingURL();
+
+  /**
+   * Returns a float between 0.0 and 1.0, indicating progress on
+   * the map portion of the job.  When all map tasks have completed,
+   * the function returns 1.0.
+   */
+  public float mapProgress() throws IOException;
+
+  /**
+   * Returns a float between 0.0 and 1.0, indicating progress on
+   * the reduce portion of the job.  When all reduce tasks have completed,
+   * the function returns 1.0.
+   */
+  public float reduceProgress() throws IOException;
+
+  /**
+   * Non-blocking function to check whether the job is finished or not.
+   */
+  public boolean isComplete() throws IOException;
+
+  /**
+   * True iff job completed successfully.
+   */
+  public boolean isSuccessful() throws IOException;
+
+  /**
+   * Blocks until the job is complete.
+   */
+  public void waitForCompletion() throws IOException;
+
+  /**
+   * Kill the running job.  Blocks until all job tasks have been
+   * killed as well.  If the job is no longer running, it simply returns.
+   */
+  public void killJob() throws IOException;
     
-    public TaskCompletionEvent[] getTaskCompletionEvents(
-        int startFrom) throws IOException;
+  public TaskCompletionEvent[] getTaskCompletionEvents(
+                                                       int startFrom) throws IOException;
     
-    /**
-     * Gets the counters for this job.
-     */
-    public Counters getCounters() throws IOException;
+  /**
+   * Gets the counters for this job.
+   */
+  public Counters getCounters() throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java Mon Apr 16 14:44:35 2007
@@ -32,7 +32,7 @@
   }
 
   public RecordReader getRecordReader(InputSplit split, JobConf job,
-      Reporter reporter) throws IOException {
+                                      Reporter reporter) throws IOException {
 
     reporter.setStatus(split.toString());
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java Mon Apr 16 14:44:35 2007
@@ -37,7 +37,7 @@
   private Writable innerValue = super.createValue();
 
   public SequenceFileAsTextRecordReader(Configuration conf, FileSplit split)
-      throws IOException {
+    throws IOException {
     super(conf, split);
   }
 
@@ -51,7 +51,7 @@
 
   /** Read key/value pair in a line. */
   public synchronized boolean next(Writable key, Writable value)
-      throws IOException {
+    throws IOException {
     Text tKey = (Text) key;
     Text tValue = (Text) value;
     if (!super.next(innerKey, innerValue)) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java Mon Apr 16 14:44:35 2007
@@ -44,264 +44,264 @@
  */
 
 public class SequenceFileInputFilter extends SequenceFileInputFormat {
-    final private static String FILTER_CLASS = "sequencefile.filter.class";
-    final private static String FILTER_FREQUENCY
-                       = "sequencefile.filter.frequency";
-    final private static String FILTER_REGEX = "sequencefile.filter.regex";
+  final private static String FILTER_CLASS = "sequencefile.filter.class";
+  final private static String FILTER_FREQUENCY
+    = "sequencefile.filter.frequency";
+  final private static String FILTER_REGEX = "sequencefile.filter.regex";
     
-    public SequenceFileInputFilter() {
-    }
+  public SequenceFileInputFilter() {
+  }
     
-    /** Create a record reader for the given split
-     * @param split file split
-     * @param job job configuration
-     * @param reporter reporter who sends report to task tracker
-     * @return RecordReader
-     */
-    public RecordReader getRecordReader(InputSplit split,
-            JobConf job, Reporter reporter)
+  /** Create a record reader for the given split
+   * @param split file split
+   * @param job job configuration
+   * @param reporter reporter who sends report to task tracker
+   * @return RecordReader
+   */
+  public RecordReader getRecordReader(InputSplit split,
+                                      JobConf job, Reporter reporter)
     throws IOException {
         
-        reporter.setStatus(split.toString());
+    reporter.setStatus(split.toString());
         
-        return new FilterRecordReader(job, (FileSplit) split);
-    }
+    return new FilterRecordReader(job, (FileSplit) split);
+  }
 
 
-    /** set the filter class
-     * 
-     * @param conf application configuration
-     * @param filterClass filter class
-     */
-    public static void setFilterClass(Configuration conf, Class filterClass) {
-        conf.set(FILTER_CLASS, filterClass.getName() );
-    }
+  /** set the filter class
+   * 
+   * @param conf application configuration
+   * @param filterClass filter class
+   */
+  public static void setFilterClass(Configuration conf, Class filterClass) {
+    conf.set(FILTER_CLASS, filterClass.getName() );
+  }
 
          
-    /**
-     * filter interface
+  /**
+   * filter interface
+   */
+  public interface Filter extends Configurable {
+    /** filter function
+     * Decide if a record should be filtered or not
+     * @param key record key
+     * @return true if a record is accepted; return false otherwise
      */
-    public interface Filter extends Configurable {
-        /** filter function
-         * Decide if a record should be filtered or not
-         * @param key record key
-         * @return true if a record is accepted; return false otherwise
-         */
-        public abstract boolean accept(Writable key);
-    }
+    public abstract boolean accept(Writable key);
+  }
     
-    /**
-     * base calss for Filters
-     */
-    public static abstract class FilterBase implements Filter {
-        Configuration conf;
+  /**
+   * base calss for Filters
+   */
+  public static abstract class FilterBase implements Filter {
+    Configuration conf;
         
-        public Configuration getConf() {
-            return conf;
-        }
+    public Configuration getConf() {
+      return conf;
     }
+  }
     
-    /** Records filter by matching key to regex
+  /** Records filter by matching key to regex
+   */
+  public static class RegexFilter extends FilterBase {
+    private Pattern p;
+    /** Define the filtering regex and stores it in conf
+     * @param conf where the regex is set
+     * @param regex regex used as a filter
      */
-    public static class RegexFilter extends FilterBase {
-        private Pattern p;
-        /** Define the filtering regex and stores it in conf
-         * @param conf where the regex is set
-         * @param regex regex used as a filter
-         */
-        public static void setPattern(Configuration conf, String regex )
-            throws PatternSyntaxException {
-            try {
-                Pattern.compile(regex);
-            } catch (PatternSyntaxException e) {
-                throw new IllegalArgumentException("Invalid pattern: "+regex);
-            }
-            conf.set(FILTER_REGEX, regex);
-        }
+    public static void setPattern(Configuration conf, String regex )
+      throws PatternSyntaxException {
+      try {
+        Pattern.compile(regex);
+      } catch (PatternSyntaxException e) {
+        throw new IllegalArgumentException("Invalid pattern: "+regex);
+      }
+      conf.set(FILTER_REGEX, regex);
+    }
         
-        public RegexFilter() { }
+    public RegexFilter() { }
         
-        /** configure the Filter by checking the configuration
-         */
-        public void setConf(Configuration conf) {
-            String regex = conf.get(FILTER_REGEX);
-            if(regex==null)
-                throw new RuntimeException(FILTER_REGEX + "not set");
-            this.p = Pattern.compile(regex);
-            this.conf = conf;
-        }
+    /** configure the Filter by checking the configuration
+     */
+    public void setConf(Configuration conf) {
+      String regex = conf.get(FILTER_REGEX);
+      if(regex==null)
+        throw new RuntimeException(FILTER_REGEX + "not set");
+      this.p = Pattern.compile(regex);
+      this.conf = conf;
+    }
 
 
-        /** Filtering method
-         * If key matches the regex, return true; otherwise return false
-         * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
-         */
-        public boolean accept(Writable key) {
-            return p.matcher(key.toString()).matches();
-        }
+    /** Filtering method
+     * If key matches the regex, return true; otherwise return false
+     * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
+     */
+    public boolean accept(Writable key) {
+      return p.matcher(key.toString()).matches();
     }
+  }
 
-    /** This class returns a percentage of records
-     * The percentage is determined by a filtering frequency <i>f</i> using
-     * the criteria record# % f == 0.
-     * For example, if the frequency is 10, one out of 10 records is returned.
+  /** This class returns a percentage of records
+   * The percentage is determined by a filtering frequency <i>f</i> using
+   * the criteria record# % f == 0.
+   * For example, if the frequency is 10, one out of 10 records is returned.
+   */
+  public static class PercentFilter extends FilterBase {
+    private int frequency;
+    private int count;
+
+    /** set the frequency and stores it in conf
+     * @param conf configuration
+     * @param frequency filtering frequencey
      */
-    public static class PercentFilter extends FilterBase {
-        private int frequency;
-        private int count;
-
-        /** set the frequency and stores it in conf
-         * @param conf configuration
-         * @param frequency filtering frequencey
-         */
-        public static void setFrequency(Configuration conf, int frequency ){
-            if(frequency<=0)
-                throw new IllegalArgumentException(
-                   "Negative " + FILTER_FREQUENCY + ": "+frequency);
-            conf.setInt(FILTER_FREQUENCY, frequency);
-        }
+    public static void setFrequency(Configuration conf, int frequency ){
+      if(frequency<=0)
+        throw new IllegalArgumentException(
+                                           "Negative " + FILTER_FREQUENCY + ": "+frequency);
+      conf.setInt(FILTER_FREQUENCY, frequency);
+    }
         
-        public PercentFilter() { }
+    public PercentFilter() { }
         
-        /** configure the filter by checking the configuration
-         * 
-         * @param conf configuration
-         */
-        public void setConf(Configuration conf) {
-            this.frequency = conf.getInt("sequencefile.filter.frequency", 10);
-            if(this.frequency <=0 ) {
-                throw new RuntimeException(
-                        "Negative "+FILTER_FREQUENCY+": "+this.frequency);
-            }
-            this.conf = conf;
-        }
-
-        /** Filtering method
-         * If record# % frequency==0, return true; otherwise return false
-         * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
-         */
-        public boolean accept(Writable key) {
-            boolean accepted = false;
-            if(count == 0)
-                accepted = true;
-            if( ++count == frequency ) {
-                count = 0;
-            }
-            return accepted;
-        }
+    /** configure the filter by checking the configuration
+     * 
+     * @param conf configuration
+     */
+    public void setConf(Configuration conf) {
+      this.frequency = conf.getInt("sequencefile.filter.frequency", 10);
+      if(this.frequency <=0 ) {
+        throw new RuntimeException(
+                                   "Negative "+FILTER_FREQUENCY+": "+this.frequency);
+      }
+      this.conf = conf;
     }
 
-    /** This class returns a set of records by examing the MD5 digest of its
-     * key against a filtering frequency <i>f</i>. The filtering criteria is
-     * MD5(key) % f == 0.
+    /** Filtering method
+     * If record# % frequency==0, return true; otherwise return false
+     * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
      */
-    public static class MD5Filter extends FilterBase {
-        private int frequency;
-        private static final MessageDigest DIGESTER;
-        public static final int MD5_LEN = 16;
-        private byte [] digest = new byte[MD5_LEN];
-        
-        static {
-          try {
-            DIGESTER = MessageDigest.getInstance("MD5");
-          } catch (NoSuchAlgorithmException e) {
-            throw new RuntimeException(e);
-          }
-        }
+    public boolean accept(Writable key) {
+      boolean accepted = false;
+      if(count == 0)
+        accepted = true;
+      if( ++count == frequency ) {
+        count = 0;
+      }
+      return accepted;
+    }
+  }
 
+  /** This class returns a set of records by examing the MD5 digest of its
+   * key against a filtering frequency <i>f</i>. The filtering criteria is
+   * MD5(key) % f == 0.
+   */
+  public static class MD5Filter extends FilterBase {
+    private int frequency;
+    private static final MessageDigest DIGESTER;
+    public static final int MD5_LEN = 16;
+    private byte [] digest = new byte[MD5_LEN];
+        
+    static {
+      try {
+        DIGESTER = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        throw new RuntimeException(e);
+      }
+    }
 
-        /** set the filtering frequency in configuration
-         * 
-         * @param conf configuration
-         * @param frequency filtering frequency
-         */
-        public static void setFrequency(Configuration conf, int frequency ){
-            if(frequency<=0)
-                throw new IllegalArgumentException(
-                   "Negative " + FILTER_FREQUENCY + ": "+frequency);
-            conf.setInt(FILTER_FREQUENCY, frequency);
-        }
+
+    /** set the filtering frequency in configuration
+     * 
+     * @param conf configuration
+     * @param frequency filtering frequency
+     */
+    public static void setFrequency(Configuration conf, int frequency ){
+      if(frequency<=0)
+        throw new IllegalArgumentException(
+                                           "Negative " + FILTER_FREQUENCY + ": "+frequency);
+      conf.setInt(FILTER_FREQUENCY, frequency);
+    }
         
-        public MD5Filter() { }
+    public MD5Filter() { }
         
-        /** configure the filter according to configuration
-         * 
-         * @param conf configuration
-         */
-        public void setConf(Configuration conf) {
-            this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
-            if(this.frequency <=0 ) {
-                throw new RuntimeException(
-                        "Negative "+FILTER_FREQUENCY+": "+this.frequency);
-            }
-            this.conf = conf;
-        }
+    /** configure the filter according to configuration
+     * 
+     * @param conf configuration
+     */
+    public void setConf(Configuration conf) {
+      this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
+      if(this.frequency <=0 ) {
+        throw new RuntimeException(
+                                   "Negative "+FILTER_FREQUENCY+": "+this.frequency);
+      }
+      this.conf = conf;
+    }
 
-        /** Filtering method
-         * If MD5(key) % frequency==0, return true; otherwise return false
-         * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
-         */
-        public boolean accept(Writable key) {
-            try {
-                long hashcode;
-                if( key instanceof Text) {
-                    hashcode = MD5Hashcode((Text)key);
-                } else if( key instanceof BytesWritable) {
-                    hashcode = MD5Hashcode((BytesWritable)key);
-                } else {
-                    ByteBuffer bb;
-                    bb = Text.encode(key.toString());
-                    hashcode = MD5Hashcode(bb.array(),0, bb.limit());
-                }
-                if(hashcode/frequency*frequency==hashcode)
-                    return true;
-            } catch(Exception e) {
-                LOG.warn(e);
-                throw new RuntimeException(e);
-            }
-            return false;
-        }
+    /** Filtering method
+     * If MD5(key) % frequency==0, return true; otherwise return false
+     * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(org.apache.hadoop.io.Writable)
+     */
+    public boolean accept(Writable key) {
+      try {
+        long hashcode;
+        if( key instanceof Text) {
+          hashcode = MD5Hashcode((Text)key);
+        } else if( key instanceof BytesWritable) {
+          hashcode = MD5Hashcode((BytesWritable)key);
+        } else {
+          ByteBuffer bb;
+          bb = Text.encode(key.toString());
+          hashcode = MD5Hashcode(bb.array(),0, bb.limit());
+        }
+        if(hashcode/frequency*frequency==hashcode)
+          return true;
+      } catch(Exception e) {
+        LOG.warn(e);
+        throw new RuntimeException(e);
+      }
+      return false;
+    }
         
-        private long MD5Hashcode(Text key) throws DigestException {
-            return MD5Hashcode(key.getBytes(), 0, key.getLength());
-        }
+    private long MD5Hashcode(Text key) throws DigestException {
+      return MD5Hashcode(key.getBytes(), 0, key.getLength());
+    }
         
-        private long MD5Hashcode(BytesWritable key) throws DigestException {
-            return MD5Hashcode(key.get(), 0, key.getSize());
-        }
-        synchronized private long MD5Hashcode(byte[] bytes, 
-                int start, int length ) throws DigestException {
-            DIGESTER.update(bytes, 0, length);
-            DIGESTER.digest(digest, 0, MD5_LEN);
-            long hashcode=0;
-            for (int i = 0; i < 8; i++)
-                hashcode |= ((digest[i] & 0xffL) << (8*(7-i)));
-            return hashcode;
-        }
+    private long MD5Hashcode(BytesWritable key) throws DigestException {
+      return MD5Hashcode(key.get(), 0, key.getSize());
     }
+    synchronized private long MD5Hashcode(byte[] bytes, 
+                                          int start, int length ) throws DigestException {
+      DIGESTER.update(bytes, 0, length);
+      DIGESTER.digest(digest, 0, MD5_LEN);
+      long hashcode=0;
+      for (int i = 0; i < 8; i++)
+        hashcode |= ((digest[i] & 0xffL) << (8*(7-i)));
+      return hashcode;
+    }
+  }
     
-    private static class FilterRecordReader extends SequenceFileRecordReader {
-        private Filter filter;
+  private static class FilterRecordReader extends SequenceFileRecordReader {
+    private Filter filter;
         
-        public FilterRecordReader(Configuration conf, FileSplit split)
-        throws IOException {
-            super(conf, split);
-            // instantiate filter
-            filter = (Filter)ReflectionUtils.newInstance(
-                    conf.getClass(FILTER_CLASS, PercentFilter.class), 
-                    conf);
-        }
+    public FilterRecordReader(Configuration conf, FileSplit split)
+      throws IOException {
+      super(conf, split);
+      // instantiate filter
+      filter = (Filter)ReflectionUtils.newInstance(
+                                                   conf.getClass(FILTER_CLASS, PercentFilter.class), 
+                                                   conf);
+    }
         
-        public synchronized boolean next(Writable key, Writable value)
-                              throws IOException {
-            while (next(key)) {
-                if(filter.accept(key)) {
-                    getCurrentValue(value);
-                    return true;
-                }
-            }
-            
-            return false;
+    public synchronized boolean next(Writable key, Writable value)
+      throws IOException {
+      while (next(key)) {
+        if(filter.accept(key)) {
+          getCurrentValue(value);
+          return true;
         }
+      }
+            
+      return false;
     }
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java Mon Apr 16 14:44:35 2007
@@ -111,8 +111,8 @@
    * @param servletClass The servlet class
    */
   public <T extends HttpServlet> 
-  void addServlet(String name, String pathSpec, 
-                  Class<T> servletClass) {
+                    void addServlet(String name, String pathSpec, 
+                                    Class<T> servletClass) {
     WebApplicationContext context = webAppContext;
     try {
       if (name == null) {
@@ -223,7 +223,7 @@
   public static class StackServlet extends HttpServlet {
     public void doGet(HttpServletRequest request, 
                       HttpServletResponse response
-                     ) throws ServletException, IOException {
+                      ) throws ServletException, IOException {
       OutputStream outStream = response.getOutputStream();
       ReflectionUtils.printThreadInfo(new PrintWriter(outStream), "");
       outStream.close();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Apr 16 14:44:35 2007
@@ -68,7 +68,7 @@
   public Task() {}
 
   public Task(String jobId, String jobFile, String tipId, 
-      String taskId, int partition) {
+              String taskId, int partition) {
     this.jobFile = jobFile;
     this.taskId = taskId;
     this.jobId = jobId;
@@ -184,14 +184,14 @@
           }
         }
         public void progress() throws IOException {
-            reportProgress(umbilical);
+          reportProgress(umbilical);
         }
         public void incrCounter(Enum key, long amount) {
-            Counters counters = getCounters();
-            if (counters != null) {
-              counters.incrCounter(key, amount);
-            }
+          Counters counters = getCounters();
+          if (counters != null) {
+            counters.incrCounter(key, amount);
           }
+        }
       };
   }