You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/06/13 21:33:18 UTC

svn commit: r190497 - in /lucene/nutch/branches/mapred: conf/nutch-default.xml src/java/org/apache/nutch/mapred/JobClient.java src/java/org/apache/nutch/mapred/LocalJobRunner.java

Author: cutting
Date: Mon Jun 13 12:33:17 2005
New Revision: 190497

URL: http://svn.apache.org/viewcvs?rev=190497&view=rev
Log:
Add local, in-process job runner.

Added:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
Modified:
    lucene/nutch/branches/mapred/conf/nutch-default.xml
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobClient.java

Modified: lucene/nutch/branches/mapred/conf/nutch-default.xml
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/conf/nutch-default.xml?rev=190497&r1=190496&r2=190497&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/conf/nutch-default.xml (original)
+++ lucene/nutch/branches/mapred/conf/nutch-default.xml Mon Jun 13 12:33:17 2005
@@ -347,7 +347,7 @@
 
 <property>
   <name>mapred.job.tracker</name>
-  <value>localhost:8010</value>
+  <value>local</value>
   <description>The host and port that the MapReduce job tracker runs at.
   </description>
 </property>

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobClient.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobClient.java?rev=190497&r1=190496&r2=190497&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobClient.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/JobClient.java Mon Jun 13 12:33:17 2005
@@ -156,7 +156,6 @@
     }
 
     JobSubmissionProtocol jobSubmitClient;
-    InetSocketAddress jobTrackAddr;
     NutchFileSystem fs = null;
 
     static Random r = new Random();
@@ -165,17 +164,24 @@
      * Build a job client, connect to the default job tracker
      */
     public JobClient(NutchConf conf) throws IOException {
-      this(JobTracker.getAddress(conf));
+      String tracker = conf.get("mapred.job.tracker", "local");
+      if ("local".equals(tracker)) {
+        this.jobSubmitClient = new LocalJobRunner();
+      } else {
+        this.jobSubmitClient = (JobSubmissionProtocol) 
+          RPC.getProxy(JobSubmissionProtocol.class,
+                       JobTracker.getAddress(conf));
+      }
     }
   
     /**
      * Build a job client, connect to the indicated job tracker.
      */
     public JobClient(InetSocketAddress jobTrackAddr) throws IOException {
-        this.jobTrackAddr = jobTrackAddr;
         this.jobSubmitClient = (JobSubmissionProtocol) 
             RPC.getProxy(JobSubmissionProtocol.class, jobTrackAddr);
     }
+
 
     /**
      */

Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java?rev=190497&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java (added)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java Mon Jun 13 12:33:17 2005
@@ -0,0 +1,138 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.mapred;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+
+/** Implements MapReduce locally, in-process, for debugging. */ 
+public class LocalJobRunner implements JobSubmissionProtocol {
+
+  private NutchFileSystem fs;
+  private HashMap jobs = new HashMap();
+
+  private class Job extends Thread
+    implements TaskUmbilicalProtocol {
+    private String file;
+    private String id;
+    private JobConf job;
+
+    private boolean mapping = true;
+    private JobStatus status = new JobStatus();
+
+    public Job(String file) throws IOException {
+      this.file = file;
+      this.id = "job_" + newId();
+
+      File localFile = new File(JobConf.getLocalDir(), id+".xml");
+      fs.copyToLocalFile(new File(file), localFile);
+      this.job = new JobConf(localFile);
+
+      this.status.jobid = id;
+      this.status.runState = JobStatus.RUNNING;
+
+      jobs.put(id, this);
+
+      this.start();
+    }
+
+    public void run() {
+      try {
+        // split input into minimum number of splits
+        FileSplit[] splits = job.getInputFormat().getSplits(fs, job, 1);
+
+        // run a map task for each split
+        String mapIds[] = new String[splits.length];
+        for (int i = 0; i < mapIds.length; i++) {
+          mapIds[i] = "map_" + newId();
+          MapTask map = new MapTask(file, mapIds[i], splits[i]);
+          map.run(job, this);
+        }
+
+        // move map output to reduce input
+        String reduceId = "_" + newId();
+        for (int i = 0; i < mapIds.length; i++) {
+          fs.rename(MapOutputFile.getOutputFile(mapIds[i], 0),
+                    MapOutputFile.getInputFile(mapIds[i], reduceId));
+          MapOutputFile.removeAll(mapIds[i]);
+        }
+
+        // run a single reduce task
+        ReduceTask reduce = new ReduceTask(file, reduceId, mapIds, 0);
+        reduce.run(job, this);
+        MapOutputFile.removeAll(reduceId);
+        
+        this.status.runState = JobStatus.SUCCEEDED;
+
+      } catch (Throwable t) {
+        this.status.runState = JobStatus.FAILED;
+        t.printStackTrace();
+      }
+    }
+
+    private String newId() {
+      return Integer.toString(Math.abs(new Random().nextInt()),36);
+    }
+
+    // TaskUmbilicalProtocol methods
+
+    public Task getTask(String taskid) { return null; }
+
+    public void progress(String taskid, FloatWritable progress) {
+      if (mapping) {
+        status.mapProgress = progress.get();
+      } else {
+        status.reduceProgress = progress.get();
+      }
+    }
+
+    public void done(String taskid) throws IOException {}
+
+
+  }
+
+  public LocalJobRunner() throws IOException {
+    this.fs = NutchFileSystem.get();
+  }
+
+  // JobSubmissionProtocol methods
+
+  public JobStatus submitJob(String jobFile) throws IOException {
+    return new Job(jobFile).status;
+  }
+
+  public void killJob(String id) {
+    ((Thread)jobs.get(id)).stop();
+  }
+
+  public JobProfile getJobProfile(String id) {
+    Job job = (Job)jobs.get(id);
+    return new JobProfile(id, job.file, "http://localhost:8080/");
+  }
+
+  public JobStatus getJobStatus(String id) {
+    Job job = (Job)jobs.get(id);
+    return job.status;
+  }
+
+  public String getFilesystemName() throws IOException {
+    return fs.getName();
+  }
+}