You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by hy...@apache.org on 2010/04/25 12:03:29 UTC

svn commit: r937774 - in /incubator/hama/trunk: bin/ conf/ src/java/ src/java/org/apache/hama/ src/java/org/apache/hama/bsp/ src/java/org/apache/hama/graph/ src/java/org/apache/hama/ipc/

Author: hyunsik
Date: Sun Apr 25 10:03:28 2010
New Revision: 937774

URL: http://svn.apache.org/viewvc?rev=937774&view=rev
Log:
HAMA-247: Discuss / Refactor HamaMaster and GroomServer

Added:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java   (with props)
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java   (with props)
    incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java   (with props)
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobClient.java   (with props)
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobContext.java   (with props)
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobID.java   (with props)
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java   (with props)
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java   (with props)
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java   (with props)
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java   (with props)
Removed:
    incubator/hama/trunk/src/java/groomserver-default.xml
    incubator/hama/trunk/src/java/org/apache/hama/HamaMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/ID.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/JobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/JobContext.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/JobID.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/JobStatus.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/TaskAttemptContext.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/TaskAttemptID.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/TaskID.java
Modified:
    incubator/hama/trunk/bin/hama
    incubator/hama/trunk/conf/hama-default.xml
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/InputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/OutputCommitter.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/OutputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/graph/RecordWriter.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java

Modified: incubator/hama/trunk/bin/hama
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/bin/hama?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/bin/hama (original)
+++ incubator/hama/trunk/bin/hama Sun Apr 25 10:03:28 2010
@@ -162,10 +162,10 @@ unset IFS
 
 # figure out which class to run
 if [ "$COMMAND" = "bspmaster" ] ; then
-  CLASS='org.apache.hama.HamaMaster'
+  CLASS='org.apache.hama.bsp.BSPMaster'
   BSP_OPTS="$BSP_OPTS $BSP_BSPMASTER_OPTS"
 elif [ "$COMMAND" = "groom" ] ; then
-  CLASS='org.apache.hama.graph.GroomServer'
+  CLASS='org.apache.hama.bsp.GroomServer'
   BSP_OPTS="$BSP_OPTS $BSP_GROOMSERVER_OPTS"
 elif [ "$COMMAND" = "version" ] ; then
   CLASS=org.apache.bsp.util.VersionInfo

Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Sun Apr 25 10:03:28 2010
@@ -31,7 +31,19 @@
   <property>
     <name>hama.groom.port</name>
     <value>40020</value>
-    <description>The port an groom server binds to.
+    <description>The port an groom server binds to.</description>
+  </property>
+  
+  <property>
+    <name>bspd.system.dir</name>
+    <value>${hadoop.tmp.dir}/bsp/system</value>
+    <description>The shared directory where BSP stores control files.
     </description>
   </property>
+  
+  <property>
+    <name>bspd.groom.local.dir</name>
+    <value>${hadoop.tmp.dir}/groomserver/local</value>
+    <description>local directory for temporal store</description> 
+  </property>
 </configuration>
\ No newline at end of file

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,231 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.ipc.HeartbeatResponse;
+import org.apache.hama.ipc.InterTrackerProtocol;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+
+/**
+ * BSPMaster is responsible to control all the bsp peers and to manage bsp jobs. 
+ */
+public class BSPMaster implements JobSubmissionProtocol, InterTrackerProtocol {
+  static{
+    Configuration.addDefaultResource("hama-default.xml");
+  }
+  
+  public static final Log LOG = LogFactory.getLog(BSPMaster.class);
+  
+  private HamaConfiguration conf;  
+  public static enum State { INITIALIZING, RUNNING }
+  State state = State.INITIALIZING;
+  
+  String masterIdentifier;
+  
+  private Server interTrackerServer;  
+  
+  FileSystem fs = null;
+  Path systemDir = null;
+  
+  // system directories are world-wide readable and owner readable
+  final static FsPermission SYSTEM_DIR_PERMISSION =
+    FsPermission.createImmutable((short) 0733); // rwx-wx-wx
+
+  // system files should have 700 permission
+  final static FsPermission SYSTEM_FILE_PERMISSION =
+    FsPermission.createImmutable((short) 0700); // rwx------
+  
+  private static final int FS_ACCESS_RETRY_PERIOD = 10000;
+  
+  private int nextJobId = 1;
+  
+  public BSPMaster(HamaConfiguration conf, String identifier) throws IOException, InterruptedException {
+    this.conf = conf;
+    
+    this.masterIdentifier = identifier;
+    
+    InetSocketAddress addr = getAddress(conf);    
+    this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), conf);
+    
+    while (!Thread.currentThread().isInterrupted()) {
+      try {
+        if (fs == null) {
+          fs = FileSystem.get(conf);
+        }
+        // clean up the system dir, which will only work if hdfs is out of 
+        // safe mode
+        if(systemDir == null) {
+          systemDir = new Path(getSystemDir());    
+        }
+
+        LOG.info("Cleaning up the system directory");
+        fs.delete(systemDir, true);
+        if (FileSystem.mkdirs(fs, systemDir, 
+            new FsPermission(SYSTEM_DIR_PERMISSION))) {
+          break;
+        }
+        LOG.error("Mkdirs failed to create " + systemDir);
+
+      } catch (AccessControlException ace) {
+        LOG.warn("Failed to operate on bspd.system.dir (" + systemDir 
+            + ") because of permissions.");
+        LOG.warn("Manually delete the bspd.system.dir (" + systemDir 
+            + ") and then start the JobTracker.");
+        LOG.warn("Bailing out ... ");
+        throw ace;
+      } catch (IOException ie) {
+        LOG.info("problem cleaning system directory: " + systemDir, ie);
+      }
+      Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+    }
+    
+    // deleteLocalFiles(SUBDIR);
+  }
+  
+  public static BSPMaster startMaster(HamaConfiguration conf) throws IOException,
+  InterruptedException {
+    return startTracker(conf, generateNewIdentifier());
+  }
+  
+  public static BSPMaster startTracker(HamaConfiguration conf, String identifier) 
+  throws IOException, InterruptedException {
+    
+    BSPMaster result = null;
+    result = new BSPMaster(conf, identifier);
+    
+    return result;
+  }
+  
+  public static InetSocketAddress getAddress(Configuration conf) {
+    String hamaMasterStr = conf.get("hama.master.address", "localhost:40000");
+    return NetUtils.createSocketAddr(hamaMasterStr);
+  }
+  
+  public int getPort() {
+    return this.conf.getInt("hama.master.port", 0);
+  }
+
+  public HamaConfiguration getConfiguration() {
+    return this.conf;
+  }
+  
+  private static SimpleDateFormat getDateFormat() {
+    return new SimpleDateFormat("yyyyMMddHHmm");
+  }
+
+  /**
+   * 
+   * @return
+   */
+  private static String generateNewIdentifier() {
+    return getDateFormat().format(new Date());
+  }
+  
+  public void offerService() throws InterruptedException, IOException {
+    this.interTrackerServer.start();
+    
+    synchronized (this) {
+      state = State.RUNNING;
+    }
+    LOG.info("Starting RUNNING");
+    
+    this.interTrackerServer.join();
+    LOG.info("Stopped interTrackerServer");
+  }
+  
+  
+  public static void main(String [] args) {
+    StringUtils.startupShutdownMessage(BSPMaster.class, args, LOG);
+    if (args.length != 0) {
+      System.out.println("usage: HamaMaster");
+      System.exit(-1);
+    }
+      
+    try {
+      HamaConfiguration conf = new HamaConfiguration();
+      BSPMaster master = startMaster(conf);
+      master.offerService();
+    } catch (Throwable e) {
+      LOG.fatal(StringUtils.stringifyException(e));
+      System.exit(-1);
+    }
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion) throws IOException {    
+    if (protocol.equals(InterTrackerProtocol.class.getName())) {
+      return InterTrackerProtocol.versionID;
+    } else if (protocol.equals(JobSubmissionProtocol.class.getName())){
+      return JobSubmissionProtocol.versionID;
+    } else {
+      throw new IOException("Unknown protocol to job tracker: " + protocol);
+    }
+  }
+
+  /**
+   * A RPC method for transmitting each peer status from peer to master.
+   */
+  @Override
+  public HeartbeatResponse heartbeat(short responseId) {
+    LOG.debug(">>> return the heartbeat message.");    
+    return new HeartbeatResponse((short)1);
+  }
+  
+  /**
+   * Return system directory to which BSP store control files.
+   */
+  @Override
+  public String getSystemDir() {
+    Path sysDir = new Path(conf.get("bspd.system.dir", "/tmp/hadoop/bsp/system"));  
+    return fs.makeQualified(sysDir).toString();
+  }
+
+  
+  /**
+   * This method returns new job id. The returned job id increases sequentially.
+   */
+  @Override
+  public JobID getNewJobId() throws IOException {
+    return new JobID(this.masterIdentifier, nextJobId++);    
+  }
+
+  @Override
+  public JobStatus submitJob(JobID jobName) throws IOException {
+    
+    return null;
+  }
+}

Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Sun Apr 25 10:03:28 2010
@@ -48,7 +48,7 @@ public class BSPPeer implements DefaultB
   protected InetSocketAddress masterAddr = null;
   protected Server server = null;
   protected ZooKeeper zk = null;
-  protected Integer mutex = 0;
+  protected volatile Integer mutex = 0;
 
   protected final String serverName;
   protected final String bindAddress;

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.ipc.HeartbeatResponse;
+import org.apache.hama.ipc.InterTrackerProtocol;
+
+public class GroomServer implements Runnable {
+  public static final Log LOG = LogFactory.getLog(GroomServer.class);
+
+  static {
+    Configuration.addDefaultResource("hama-default.xml");
+  }
+
+  static enum State {
+    NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
+  };
+
+  HamaConfiguration conf;
+
+  volatile boolean running = true;
+  volatile boolean shuttingDown = false;
+  boolean justInited = true;
+
+  String groomserverName;
+  String localHostname;
+
+  InetSocketAddress masterAddr;
+  InterTrackerProtocol jobClient;
+  //BSPPeer bspPeer;
+
+  short heartbeatResponseId = -1;
+  private volatile int heartbeatInterval = 3 * 1000;
+
+  private LocalDirAllocator localDirAllocator;
+  Path systemDirectory = null;
+  FileSystem systemFS = null;
+
+  public GroomServer(HamaConfiguration conf) throws IOException {
+    this.conf = conf;
+    masterAddr = BSPMaster.getAddress(conf);
+
+    FileSystem local = FileSystem.getLocal(conf);
+    this.localDirAllocator = new LocalDirAllocator("bspd.groom.local.dir");
+
+    initialize();
+  }
+
+  synchronized void initialize() throws IOException {
+    if (this.conf.get("slave.host.name") != null) {
+      this.localHostname = conf.get("slave.host.name");
+    }
+
+    if (localHostname == null) {
+      this.localHostname = DNS.getDefaultHost(conf.get(
+          "bspd.groom.dns.interface", "default"), conf.get(
+          "bspd.groom.dns.nameserver", "default"));
+    }
+
+    checkLocalDirs(conf.getStrings("bspd.groom.local.dir"));
+    deleteLocalFiles("groomserver");
+
+    this.groomserverName = "groomd_" + localHostname;
+    LOG.info("Starting tracker " + this.groomserverName);
+
+    DistributedCache.purgeCache(this.conf);
+
+    this.jobClient = (InterTrackerProtocol) RPC.waitForProxy(
+        InterTrackerProtocol.class, InterTrackerProtocol.versionID, masterAddr,
+        conf);
+    this.running = true;
+    // this.bspPeer = new BSPPeer(this.conf);
+  }
+
+  private static void checkLocalDirs(String[] localDirs)
+      throws DiskErrorException {
+    boolean writable = false;
+
+    if (localDirs != null) {
+      for (int i = 0; i < localDirs.length; i++) {
+        try {
+          DiskChecker.checkDir(new File(localDirs[i]));
+          writable = true;
+        } catch (DiskErrorException e) {
+          LOG.warn("Graph Processor local " + e.getMessage());
+        }
+      }
+    }
+
+    if (!writable)
+      throw new DiskErrorException("all local directories are not writable");
+  }
+
+  public String[] getLocalDirs() {
+    return conf.getStrings("bspd.groom.local.dir");
+  }
+
+  public void deleteLocalFiles() throws IOException {
+    String[] localDirs = getLocalDirs();
+    for (int i = 0; i < localDirs.length; i++) {
+      FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]));
+    }
+  }
+
+  public void deleteLocalFiles(String subdir) throws IOException {
+    String[] localDirs = getLocalDirs();
+    for (int i = 0; i < localDirs.length; i++) {
+      FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir));
+    }
+  }
+
+  public void cleanupStorage() throws IOException {
+    deleteLocalFiles();
+  }
+
+  private void startCleanupThreads() throws IOException {
+
+  }
+
+  public State offerService() throws Exception {
+    long lastHeartbeat = 0;
+
+    while (running && !shuttingDown) {
+      try {
+        long now = System.currentTimeMillis();
+
+        long waitTime = heartbeatInterval - (now - lastHeartbeat);
+        if (waitTime > 0) {
+          // sleeps for the wait time
+          Thread.sleep(waitTime);
+        }
+
+        if (justInited) {
+          String dir = jobClient.getSystemDir();
+          if (dir == null) {
+            throw new IOException("Failed to get system directory");
+          }
+          systemDirectory = new Path(dir);
+          systemFS = systemDirectory.getFileSystem(conf);
+        }
+
+        // Send the heartbeat and process the jobtracker's directives
+        HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
+
+        // Note the time when the heartbeat returned, use this to decide when to
+        // send the
+        // next heartbeat
+        lastHeartbeat = System.currentTimeMillis();
+
+        justInited = false;
+      } catch (InterruptedException ie) {
+        LOG.info("Interrupted. Closing down.");
+        return State.INTERRUPTED;
+      } catch (DiskErrorException de) {
+        String msg = "Exiting task tracker for disk error:\n"
+            + StringUtils.stringifyException(de);
+        LOG.error(msg);
+
+        return State.STALE;
+      } catch (RemoteException re) {
+        return State.DENIED;
+      } catch (Exception except) {
+        String msg = "Caught exception: "
+            + StringUtils.stringifyException(except);
+        LOG.error(msg);
+      }
+    }
+
+    return State.NORMAL;
+  }
+
+  private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+    HeartbeatResponse heartbeatResponse = jobClient
+        .heartbeat(heartbeatResponseId);
+    return heartbeatResponse;
+  }
+
+  public void run() {
+    try {
+      startCleanupThreads();
+      boolean denied = false;
+      while (running && !shuttingDown && !denied) {
+        boolean staleState = false;
+        try {
+          while (running && !staleState && !shuttingDown && !denied) {
+            try {
+              State osState = offerService();
+              if (osState == State.STALE) {
+                staleState = true;
+              } else if (osState == State.DENIED) {
+                denied = true;
+              }
+            } catch (Exception e) {
+              if (!shuttingDown) {
+                LOG.info("Lost connection to GraphProcessor [" + masterAddr
+                    + "].  Retrying...", e);
+                try {
+                  Thread.sleep(5000);
+                } catch (InterruptedException ie) {
+                }
+              }
+            }
+          }
+        } finally {
+          // close();
+        }
+
+        if (shuttingDown) {
+          return;
+        }
+        LOG.warn("Reinitializing local state");
+        initialize();
+      }
+    } catch (IOException ioe) {
+      LOG.error("Got fatal exception while reinitializing TaskTracker: "
+          + StringUtils.stringifyException(ioe));
+      return;
+    }
+  }
+
+  public synchronized void shutdown() throws IOException {
+    shuttingDown = true;
+    close();
+  }
+
+  public synchronized void close() throws IOException {
+    this.running = false;
+
+    cleanupStorage();
+
+    // shutdown RPC connections
+    RPC.stopProxy(jobClient);
+  }
+
+  public static void main(String[] args) {
+    StringUtils.startupShutdownMessage(GroomServer.class, args, LOG);
+    if (args.length != 0) {
+      System.out.println("usage: GroomServer");
+      System.exit(-1);
+    }
+
+    try {
+      HamaConfiguration conf = new HamaConfiguration();
+      new GroomServer(conf).run();
+    } catch (Throwable e) {
+      LOG.fatal(StringUtils.stringifyException(e));
+      System.exit(-1);
+    }
+  }
+}

Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public abstract class ID implements WritableComparable<ID> {
+  protected static final char SEPARATOR = '_';
+  protected int id;
+
+  public ID(int id) {
+    this.id = id;
+  }
+
+  protected ID() {
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  @Override
+  public String toString() {
+    return String.valueOf(id);
+  }
+
+  @Override
+  public int hashCode() {
+    return Integer.valueOf(id).hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (o == null)
+      return false;
+    if (o.getClass() == this.getClass()) {
+      ID that = (ID) o;
+      return this.id == that.id;
+    } else
+      return false;
+  }
+
+  public int compareTo(ID that) {
+    return this.id - that.id;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.id = in.readInt();
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(id);
+  }
+}

Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/ID.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobClient.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobClient.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobClient.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+
+public class JobClient extends Configured {
+  private static final Log LOG = LogFactory.getLog(JobClient.class);
+
+  public static enum TaskStatusFilter {
+    NONE, KILLED, FAILED, SUCCEEDED, ALL
+  }
+
+  static {
+    Configuration.addDefaultResource("hama-default.xml");
+  }
+
+  @SuppressWarnings("unused")
+  private JobSubmissionProtocol jobSubmitClient = null;
+
+  public JobClient() {
+  }
+
+  public JobClient(HamaConfiguration conf) throws IOException {
+    setConf(conf);
+    init(conf);
+  }
+
+  public void init(HamaConfiguration conf) throws IOException {
+    String tracker = conf.get("hama.master.address", "local");
+    this.jobSubmitClient = createRPCProxy(BSPMaster.getAddress(conf), conf);
+  }
+
+  private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
+      Configuration conf) throws IOException {
+    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+        JobSubmissionProtocol.versionID, addr, conf, NetUtils.getSocketFactory(
+            conf, JobSubmissionProtocol.class));
+  }
+}

Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobClient.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobContext.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobContext.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobContext.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobContext.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.graph.InputFormat;
+import org.apache.hama.graph.OutputFormat;
+
+/**
+ * A read-only view of the job that is provided to the tasks while they are
+ * running.
+ */
+public class JobContext {
+  // Put all of the attribute names in here so that Job and JobContext are
+  // consistent.
+  protected static final String INPUT_FORMAT_CLASS_ATTR = "angrapa.inputformat.class";
+  protected static final String WALKER_CLASS_ATTR = "angrapa.walker.class";
+  protected static final String OUTPUT_FORMAT_CLASS_ATTR = "angrapa.outputformat.class";
+
+  protected final Configuration conf;
+  private final JobID jobId;
+
+  public JobContext(Configuration conf, JobID jobId) {
+    this.conf = conf;
+    this.jobId = jobId;
+  }
+
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  public JobID getJobID() {
+    return jobId;
+  }
+
+  public Path getWorkingDirectory() throws IOException {
+    String name = conf.get("angrapa.working.dir");
+
+    if (name != null) {
+      return new Path(name);
+    } else {
+      try {
+        Path dir = FileSystem.get(conf).getWorkingDirectory();
+        conf.set("angrapa.working.dir", dir.toString());
+        return dir;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  public Class<?> getOutputKeyClass() {
+    return conf.getClass("angrapa.output.key.class", LongWritable.class,
+        Object.class);
+  }
+
+  public Class<?> getOutputValueClass() {
+    return conf
+        .getClass("angrapa.output.value.class", Text.class, Object.class);
+  }
+
+  public String getJobName() {
+    return conf.get("angrapa.job.name", "");
+  }
+
+  @SuppressWarnings("unchecked")
+  public Class<? extends InputFormat<?, ?>> getInputFormatClass()
+      throws ClassNotFoundException {
+    return (Class<? extends InputFormat<?, ?>>) conf.getClass(
+        INPUT_FORMAT_CLASS_ATTR, InputFormat.class); // TODO: To be corrected
+    // to an implemented class
+  }
+
+  @SuppressWarnings("unchecked")
+  public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+      throws ClassNotFoundException {
+    return (Class<? extends OutputFormat<?, ?>>) conf.getClass(
+        OUTPUT_FORMAT_CLASS_ATTR, OutputFormat.class); // TODO: To be corrected
+    // to an implemented
+    // class
+  }
+
+  public RawComparator<?> getSortComparator() {
+    return null;
+  }
+
+  public String getJar() {
+    return conf.get("walker.jar");
+  }
+}

Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobContext.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobID.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobID.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobID.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.io.Text;
+
+public class JobID extends ID implements Comparable<ID> {
+  protected static final String JOB = "job";
+  private final Text jtIdentifier;
+
+  protected static final NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(4);
+  }
+
+  public JobID(String jtIdentifier, int id) {
+    super(id);
+    this.jtIdentifier = new Text(jtIdentifier);
+  }
+
+  public JobID() {
+    jtIdentifier = new Text();
+  }
+
+  public String getJtIdentifier() {
+    return jtIdentifier.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o))
+      return false;
+
+    JobID that = (JobID) o;
+    return this.jtIdentifier.equals(that.jtIdentifier);
+  }
+
+  @Override
+  public int compareTo(ID o) {
+    JobID that = (JobID) o;
+    int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier);
+    if (jtComp == 0) {
+      return this.id - that.id;
+    } else
+      return jtComp;
+  }
+
+  public StringBuilder appendTo(StringBuilder builder) {
+    builder.append(SEPARATOR);
+    builder.append(jtIdentifier);
+    builder.append(SEPARATOR);
+    builder.append(idFormat.format(id));
+    return builder;
+  }
+
+  @Override
+  public int hashCode() {
+    return jtIdentifier.hashCode() + id;
+  }
+
+  @Override
+  public String toString() {
+    return appendTo(new StringBuilder(JOB)).toString();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.jtIdentifier.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    jtIdentifier.write(out);
+  }
+
+  public static JobID forName(String str) throws IllegalArgumentException {
+    if (str == null)
+      return null;
+    try {
+      String[] parts = str.split("_");
+      if (parts.length == 3) {
+        if (parts[0].equals(JOB)) {
+          return new JobID(parts[1], Integer.parseInt(parts[2]));
+        }
+      }
+    } catch (Exception ex) {
+    }
+    throw new IllegalArgumentException("JobId string : " + str
+        + " is not properly formed");
+  }
+}

Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobID.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+public class JobStatus implements Writable, Cloneable {
+
+  static {
+    WritableFactories.setFactory(JobStatus.class, new WritableFactory() {
+      public Writable newInstance() {
+        return new JobStatus();
+      }
+    });
+  }
+
+  public static final int RUNNING = 1;
+  public static final int SUCCEEDED = 2;
+  public static final int FAILED = 3;
+  public static final int PREP = 4;
+  public static final int KILLED = 5;
+
+  private JobID jobid;
+  private float progress;
+  private float cleanupProgress;
+  private float setupProgress;
+  private int runState;
+  private long startTime;
+  private String schedulingInfo = "NA";
+
+  public JobStatus() {
+  }
+
+  public JobStatus(JobID jobid, float progress, int runState) {
+    this(jobid, progress, 0.0f, runState);
+  }
+
+  public JobStatus(JobID jobid, float progress, float cleanupProgress,
+      int runState) {
+    this(jobid, 0.0f, progress, cleanupProgress, runState);
+  }
+
+  public JobStatus(JobID jobid, float setupProgress, float progress,
+      float cleanupProgress, int runState) {
+    this.jobid = jobid;
+    this.setupProgress = setupProgress;
+    this.progress = progress;
+    this.cleanupProgress = cleanupProgress;
+    this.runState = runState;
+  }
+
+  public JobID getJobID() {
+    return jobid;
+  }
+
+  public synchronized float progress() {
+    return progress;
+  }
+
+  synchronized void setprogress(float p) {
+    this.progress = (float) Math.min(1.0, Math.max(0.0, p));
+  }
+
+  public synchronized float cleanupProgress() {
+    return cleanupProgress;
+  }
+
+  synchronized void setCleanupProgress(float p) {
+    this.cleanupProgress = (float) Math.min(1.0, Math.max(0.0, p));
+  }
+
+  public synchronized float setupProgress() {
+    return setupProgress;
+  }
+
+  synchronized void setSetupProgress(float p) {
+    this.setupProgress = (float) Math.min(1.0, Math.max(0.0, p));
+  }
+
+  public synchronized int getRunState() {
+    return runState;
+  }
+
+  public synchronized void setRunState(int state) {
+    this.runState = state;
+  }
+
+  synchronized void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  synchronized public long getStartTime() {
+    return startTime;
+  }
+
+  @Override
+  public Object clone() {
+    try {
+      return super.clone();
+    } catch (CloneNotSupportedException cnse) {
+      throw new InternalError(cnse.toString());
+    }
+  }
+
+  public synchronized String getSchedulingInfo() {
+    return schedulingInfo;
+  }
+
+  public synchronized void setSchedulingInfo(String schedulingInfo) {
+    this.schedulingInfo = schedulingInfo;
+  }
+
+  public synchronized boolean isJobComplete() {
+    return (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED || runState == JobStatus.KILLED);
+  }
+
+  public synchronized void write(DataOutput out) throws IOException {
+    jobid.write(out);
+    out.writeFloat(setupProgress);
+    out.writeFloat(progress);
+    out.writeFloat(cleanupProgress);
+    out.writeInt(runState);
+    out.writeLong(startTime);
+    Text.writeString(out, schedulingInfo);
+  }
+
+  public synchronized void readFields(DataInput in) throws IOException {
+    this.jobid = new JobID();
+    jobid.readFields(in);
+    this.setupProgress = in.readFloat();
+    this.progress = in.readFloat();
+    this.cleanupProgress = in.readFloat();
+    this.runState = in.readInt();
+    this.startTime = in.readLong();
+    this.schedulingInfo = Text.readString(in);
+  }
+}

Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The context for task attempts.
+ */
+public class TaskAttemptContext extends JobContext implements Progressable {
+  private final TaskAttemptID taskId;
+  private String status = "";
+
+  public TaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
+    super(conf, taskId.getJobID());
+    this.taskId = taskId;
+  }
+
+  /**
+   * Get the unique name for this task attempt.
+   */
+  public TaskAttemptID getTaskAttemptID() {
+    return taskId;
+  }
+
+  /**
+   * Set the current status of the task to the given string.
+   */
+  public void setStatus(String msg) throws IOException {
+    status = msg;
+  }
+
+  /**
+   * Get the last set status message.
+   * 
+   * @return the current status message
+   */
+  public String getStatus() {
+    return status;
+  }
+
+  /**
+   * Report progress. The subtypes actually do work in this method.
+   */
+  public void progress() {
+  }
+}

Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class TaskAttemptID extends ID {
+  protected static final String ATTEMPT = "attempt";
+  private TaskID taskId;
+
+  public TaskAttemptID(TaskID taskId, int id) {
+    super(id);
+    if (taskId == null) {
+      throw new IllegalArgumentException("taskId cannot be null");
+    }
+    this.taskId = taskId;
+  }
+
+  public TaskAttemptID(String jtIdentifier, int jobId, boolean isMatrixTask,
+      int taskId, int id) {
+    this(new TaskID(jtIdentifier, jobId, isMatrixTask, taskId), id);
+  }
+
+  public TaskAttemptID() {
+    taskId = new TaskID();
+  }
+
+  public JobID getJobID() {
+    return taskId.getJobID();
+  }
+
+  public TaskID getTaskID() {
+    return taskId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o))
+      return false;
+
+    TaskAttemptID that = (TaskAttemptID) o;
+    return this.taskId.equals(that.taskId);
+  }
+
+  protected StringBuilder appendTo(StringBuilder builder) {
+    return taskId.appendTo(builder).append(SEPARATOR).append(id);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    taskId.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    taskId.write(out);
+  }
+
+  @Override
+  public int hashCode() {
+    return taskId.hashCode() * 5 + id;
+  }
+
+  @Override
+  public int compareTo(ID o) {
+    TaskAttemptID that = (TaskAttemptID) o;
+    int tipComp = this.taskId.compareTo(that.taskId);
+    if (tipComp == 0) {
+      return this.id - that.id;
+    } else
+      return tipComp;
+  }
+
+  @Override
+  public String toString() {
+    return appendTo(new StringBuilder(ATTEMPT)).toString();
+  }
+
+  public static TaskAttemptID forName(String str)
+      throws IllegalArgumentException {
+    if (str == null)
+      return null;
+    try {
+      String[] parts = str.split(Character.toString(SEPARATOR));
+      if (parts.length == 6) {
+        if (parts[0].equals(ATTEMPT)) {
+          boolean isMatrixTask = false;
+          if (parts[3].equals("m"))
+            isMatrixTask = true;
+          else if (parts[3].equals("g"))
+            isMatrixTask = false;
+          else
+            throw new Exception();
+
+          return new TaskAttemptID(parts[1], Integer.parseInt(parts[2]),
+              isMatrixTask, Integer.parseInt(parts[4]), Integer
+                  .parseInt(parts[5]));
+        }
+      }
+    } catch (Exception ex) {
+      // fall below
+    }
+    throw new IllegalArgumentException("TaskAttemptId string : " + str
+        + " is not properly formed");
+  }
+}

Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java?rev=937774&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java Sun Apr 25 10:03:28 2010
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+public class TaskID extends ID {
+  protected static final String TASK = "task";
+  protected static final NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(6);
+  }
+
+  private JobID jobId;
+  private boolean isMatrixTask;
+
+  public TaskID(JobID jobId, boolean isMatrixTask, int id) {
+    super(id);
+    if (jobId == null) {
+      throw new IllegalArgumentException("jobId cannot be null");
+    }
+    this.jobId = jobId;
+    this.isMatrixTask = isMatrixTask;
+  }
+
+  public TaskID(String jtIdentifier, int jobId, boolean isGraphTask, int id) {
+    this(new JobID(jtIdentifier, jobId), isGraphTask, id);
+  }
+
+  public TaskID() {
+    jobId = new JobID();
+  }
+
+  /** Returns the {@link JobID} object that this tip belongs to */
+  public JobID getJobID() {
+    return jobId;
+  }
+
+  public boolean isGraphTask() {
+    return isMatrixTask;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o))
+      return false;
+
+    TaskID that = (TaskID) o;
+    return this.isMatrixTask == that.isMatrixTask
+        && this.jobId.equals(that.jobId);
+  }
+
+  @Override
+  public int compareTo(ID o) {
+    TaskID that = (TaskID) o;
+    int jobComp = this.jobId.compareTo(that.jobId);
+    if (jobComp == 0) {
+      if (this.isMatrixTask == that.isMatrixTask) {
+        return this.id - that.id;
+      } else
+        return this.isMatrixTask ? -1 : 1;
+    } else {
+      return jobComp;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return appendTo(new StringBuilder(TASK)).toString();
+  }
+
+  protected StringBuilder appendTo(StringBuilder builder) {
+    return jobId.appendTo(builder).append(SEPARATOR).append(
+        isMatrixTask ? 'm' : 'g').append(SEPARATOR).append(idFormat.format(id));
+  }
+
+  @Override
+  public int hashCode() {
+    return jobId.hashCode() * 524287 + id;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    jobId.readFields(in);
+    isMatrixTask = in.readBoolean();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    jobId.write(out);
+    out.writeBoolean(isMatrixTask);
+  }
+
+  public static TaskID forName(String str) throws IllegalArgumentException {
+    if (str == null)
+      return null;
+    try {
+      String[] parts = str.split("_");
+      if (parts.length == 5) {
+        if (parts[0].equals(TASK)) {
+          boolean isMatrixTask = false;
+          if (parts[3].equals("m"))
+            isMatrixTask = true;
+          else if (parts[3].equals("g"))
+            isMatrixTask = false;
+          else
+            throw new Exception();
+          return new TaskID(parts[1], Integer.parseInt(parts[2]), isMatrixTask,
+              Integer.parseInt(parts[4]));
+        }
+      }
+    } catch (Exception ex) {
+    }
+    throw new IllegalArgumentException("TaskId string : " + str
+        + " is not properly formed");
+  }
+}

Propchange: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/hama/trunk/src/java/org/apache/hama/graph/InputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/InputFormat.java?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/InputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/InputFormat.java Sun Apr 25 10:03:28 2010
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hama.bsp.JobContext;
 
 public abstract class InputFormat<KEYIN, VALUEIN> {
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/graph/OutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/OutputCommitter.java?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/OutputCommitter.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/OutputCommitter.java Sun Apr 25 10:03:28 2010
@@ -20,6 +20,9 @@ package org.apache.hama.graph;
 
 import java.io.IOException;
 
+import org.apache.hama.bsp.JobContext;
+import org.apache.hama.bsp.TaskAttemptContext;
+
 /**
  * 
  * @see JobContext

Modified: incubator/hama/trunk/src/java/org/apache/hama/graph/OutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/OutputFormat.java?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/OutputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/OutputFormat.java Sun Apr 25 10:03:28 2010
@@ -21,6 +21,8 @@ package org.apache.hama.graph;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hama.bsp.JobContext;
+import org.apache.hama.bsp.TaskAttemptContext;
 
 /**
  * <code>OutputFormat</code> describes the output-specification for a Angrapa

Modified: incubator/hama/trunk/src/java/org/apache/hama/graph/RecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/graph/RecordWriter.java?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/graph/RecordWriter.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/graph/RecordWriter.java Sun Apr 25 10:03:28 2010
@@ -21,6 +21,7 @@ package org.apache.hama.graph;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hama.bsp.TaskAttemptContext;
 
 /**
  * <code>RecordWriter</code> writes the output &lt;key, value&gt; pairs to an

Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=937774&r1=937773&r2=937774&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java Sun Apr 25 10:03:28 2010
@@ -21,8 +21,8 @@ package org.apache.hama.ipc;
 
 import java.io.IOException;
 
-import org.apache.hama.graph.JobID;
-import org.apache.hama.graph.JobStatus;
+import org.apache.hama.bsp.JobID;
+import org.apache.hama.bsp.JobStatus;
 
 /**
  * Protocol that a Walker and the central Master use to communicate. This