You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by to...@apache.org on 2011/08/01 16:12:56 UTC

svn commit: r1152788 [3/9] - in /incubator/hama/trunk: ./ bin/ conf/ core/ core/bin/ core/conf/ core/src/ core/src/main/ core/src/main/java/ core/src/main/java/org/ core/src/main/java/org/apache/ core/src/main/java/org/apache/hama/ core/src/main/java/o...

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BooleanMessage.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a string tag and a boolean value. 
+ */
+public class BooleanMessage extends BSPMessage {
+
+  String tag;
+  boolean data;
+
+  public BooleanMessage() {
+    super();
+  }
+
+  public BooleanMessage(String tag, boolean data) {
+    super();
+    this.tag = tag;
+    this.data = data;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(tag);
+    out.writeBoolean(data);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    tag = in.readUTF();
+    data = in.readBoolean();
+  }
+
+  @Override
+  public String getTag() {
+    return tag;
+  }
+
+  @Override
+  public Boolean getData() {
+    return data;
+  }
+}
\ No newline at end of file

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ByteMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ByteMessage.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ByteMessage.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ByteMessage.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a byte tag and a byte data.
+ */
+public class ByteMessage extends BSPMessage {
+
+  private byte[] tag;
+  private byte[] data;
+
+  public ByteMessage() {
+    super();
+  }
+
+  public ByteMessage(byte[] tag, byte[] data) {
+    super();
+    this.tag = tag;
+    this.data = data;
+  }
+
+  @Override
+  public byte[] getTag() {
+    return this.tag;
+  }
+
+  @Override
+  public byte[] getData() {
+    return this.data;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.tag = new byte[in.readInt()];
+    in.readFully(tag, 0, this.tag.length);
+    this.data = new byte[in.readInt()];
+    in.readFully(data, 0, this.data.length);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(tag.length);
+    out.write(tag);
+    out.writeInt(data.length);
+    out.write(data);
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Status information on the current state of the BSP cluster.
+ * 
+ * <p><code>ClusterStatus</code> provides clients with information such as:
+ * <ol>
+ *   <li>
+ *   Size of the cluster. 
+ *   </li>
+ *   <li>
+ *   Name of the grooms. 
+ *   </li>
+ *   <li>
+ *   Task capacity of the cluster. 
+ *   </li>
+ *   <li>
+ *   The number of currently running bsp tasks.
+ *   </li>
+ *   <li>
+ *   State of the <code>BSPMaster</code>.
+ *   </li>
+ * </ol></p>
+ * 
+ * <p>Clients can query for the latest <code>ClusterStatus</code>, via 
+ * {@link BSPJobClient#getClusterStatus(boolean)}.</p>
+ * 
+ * @see BSPMaster
+ */
+public class ClusterStatus implements Writable {
+
+  private int numActiveGrooms;
+  private Map<String, String> activeGrooms = new HashMap<String, String>();
+  private int tasks;
+  private int maxTasks;
+  private BSPMaster.State state;
+  
+  /**
+   * 
+   */
+  public ClusterStatus() {}
+    
+  public ClusterStatus(int grooms, int tasks, int maxTasks, BSPMaster.State state) {
+    this.numActiveGrooms = grooms;
+    this.tasks = tasks;
+    this.maxTasks = maxTasks;
+    this.state = state;
+  }
+  
+  public ClusterStatus(Map<String, String> activeGrooms, int tasks, int maxTasks,
+      BSPMaster.State state) {
+    this(activeGrooms.size(), tasks, maxTasks, state);
+    this.activeGrooms = activeGrooms;
+  }
+  
+  /**
+   * Get the number of groom servers in the cluster.
+   * 
+   * @return the number of groom servers in the cluster.
+   */
+  public int getGroomServers() {
+    return numActiveGrooms;
+  }
+  
+  /**
+   * Get the names of groom servers, and their peers, in the cluster.
+   * 
+   * @return the active groom servers in the cluster.
+   */  
+  public Map<String, String> getActiveGroomNames() {
+    return activeGrooms;
+  }
+  
+  /**
+   * Get the number of currently running tasks in the cluster.
+   * 
+   * @return the number of currently running tasks in the cluster.
+   */
+  public int getTasks() {
+    return tasks;
+  }
+  
+  /**
+   * Get the maximum capacity for running tasks in the cluster.
+   * 
+   * @return the maximum capacity for running tasks in the cluster.
+   */
+  public int getMaxTasks() {
+    return maxTasks;
+  }
+  
+  /**
+   * Get the current state of the <code>BSPMaster</code>, 
+   * as {@link BSPMaster.State}
+   * 
+   * @return the current state of the <code>BSPMaster</code>.
+   */
+  public BSPMaster.State getBSPMasterState() {
+    return state;
+  }
+  
+  //////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (activeGrooms.isEmpty()) {
+      out.writeInt(numActiveGrooms);
+      out.writeBoolean(false);
+    } else {
+      out.writeInt(activeGrooms.size());
+      out.writeBoolean(true);
+
+      String[] groomNames = activeGrooms.keySet().toArray(new String[0]);
+      List<String> peerNames = new ArrayList<String>();
+
+      for (String groomName : groomNames) {
+        peerNames.add(activeGrooms.get(groomName));
+      }
+
+      WritableUtils.writeCompressedStringArray(out, groomNames);
+      WritableUtils.writeCompressedStringArray(out, peerNames.toArray(new String[0]));
+    }
+    out.writeInt(tasks);
+    out.writeInt(maxTasks);
+    WritableUtils.writeEnum(out, state);
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    numActiveGrooms = in.readInt();
+    boolean groomListFollows = in.readBoolean();
+
+    if (groomListFollows) {
+      String[] groomNames = WritableUtils.readCompressedStringArray(in);
+      String[] peerNames = WritableUtils.readCompressedStringArray(in);
+      activeGrooms = new HashMap<String, String>(groomNames.length);
+
+      for (int i = 0; i < groomNames.length; i++) {
+        activeGrooms.put(groomNames[i], peerNames[i]);
+      }
+    }
+
+    tasks = in.readInt();
+    maxTasks = in.readInt();
+    state = WritableUtils.readEnum(in, BSPMaster.State.class);
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CommitTaskAction.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} 
+ * to the {@link org.apache.hama.bsp.GroomServer} to commit the output
+ * of the task.
+ */
+class CommitTaskAction extends GroomServerAction {
+  private TaskAttemptID taskId;
+  
+  public CommitTaskAction() {
+    super(ActionType.COMMIT_TASK);
+    taskId = new TaskAttemptID();
+  }
+  
+  public CommitTaskAction(TaskAttemptID taskId) {
+    super(ActionType.COMMIT_TASK);
+    this.taskId = taskId;
+  }
+  
+  public TaskAttemptID getTaskID() {
+    return taskId;
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    taskId.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    taskId.readFields(in);
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Directive.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A generic directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
+ */
+public class Directive implements Writable{
+
+  protected long timestamp;
+  protected Directive.Type type;
+
+  public static enum Type {
+    Request(1), Response(2);
+    int t;
+
+    Type(int t) {
+      this.t = t;
+    }
+
+    public int value() {
+      return this.t;
+    }
+  };
+
+  public Directive(){}
+
+  public Directive(Directive.Type type) {
+    this.timestamp = System.currentTimeMillis();
+    this.type = type;
+  }
+
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  public Directive.Type getType() {
+    return this.type;
+  }
+
+  /**
+   * Command for BSPMaster or GroomServer to execute.
+  public abstract void execute() throws Exception;
+   */
+
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(this.timestamp);
+    out.writeInt(this.type.value());
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.timestamp = in.readLong();
+    int t = in.readInt();
+    if (Directive.Type.Request.value() == t) {
+      this.type = Directive.Type.Request;
+    }else{
+      this.type = Directive.Type.Response;
+    }
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveException.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveException.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveException.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveException.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * A custom exception class for Directive.
+ */
+public class DirectiveException extends RuntimeException{
+  private static final long serialVersionUID = -8052582046894492822L;
+
+  public DirectiveException(){ 
+    super(); 
+  }
+
+  public DirectiveException(String message){ 
+    super(message); 
+  }
+
+  public DirectiveException(String message, Throwable t){
+    super(message, t);
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DirectiveHandler.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * A DirectiveHandler interface.
+ */
+public interface DirectiveHandler{
+
+  /**
+   * Handle directives on demand. 
+   * @param directive to be handled.
+   */
+  void handle(Directive directive) throws DirectiveException;
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Handles the tasks dispatching between the BSPMaster and the GroomServers.
+ */
+public final class DispatchTasksDirective extends Directive implements Writable {
+
+  public static final Log LOG = LogFactory.getLog(DispatchTasksDirective.class);
+
+  private Map<String, String> groomServerPeers;
+  private GroomServerAction[] actions;
+
+  public DispatchTasksDirective() {
+    super();
+  }
+
+  public DispatchTasksDirective(Map<String, String> groomServerPeers,
+      GroomServerAction[] actions) {
+    super(Directive.Type.Request);
+    this.groomServerPeers = groomServerPeers;
+    this.actions = actions;
+  }
+
+  public Map<String, String> getGroomServerPeers() {
+    return this.groomServerPeers;
+  }
+
+  public GroomServerAction[] getActions() {
+    return this.actions;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    if (this.actions == null) {
+      WritableUtils.writeVInt(out, 0);
+    } else {
+      WritableUtils.writeVInt(out, actions.length);
+      for (GroomServerAction action : this.actions) {
+        WritableUtils.writeEnum(out, action.getActionType());
+        action.write(out);
+      }
+    }
+    String[] groomServerNames = groomServerPeers.keySet()
+        .toArray(new String[0]);
+    WritableUtils.writeCompressedStringArray(out, groomServerNames);
+
+    List<String> groomServerAddresses = new ArrayList<String>(
+        groomServerNames.length);
+    for (String groomName : groomServerNames) {
+      groomServerAddresses.add(groomServerPeers.get(groomName));
+    }
+    WritableUtils.writeCompressedStringArray(out, groomServerAddresses
+        .toArray(new String[0]));
+
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    int length = WritableUtils.readVInt(in);
+    if (length > 0) {
+      this.actions = new GroomServerAction[length];
+      for (int i = 0; i < length; ++i) {
+        GroomServerAction.ActionType actionType = WritableUtils.readEnum(in,
+            GroomServerAction.ActionType.class);
+        actions[i] = GroomServerAction.createAction(actionType);
+        actions[i].readFields(in);
+      }
+    } else {
+      this.actions = null;
+    }
+    String[] groomServerNames = WritableUtils.readCompressedStringArray(in);
+    String[] groomServerAddresses = WritableUtils.readCompressedStringArray(in);
+    groomServerPeers = new HashMap<String, String>(groomServerNames.length);
+
+    for (int i = 0; i < groomServerNames.length; i++) {
+      groomServerPeers.put(groomServerNames[i], groomServerAddresses[i]);
+    }
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DoubleMessage.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a string tag and a double data. 
+ */
+public class DoubleMessage extends BSPMessage {
+
+  private String tag;
+  private double data;
+
+  public DoubleMessage() {
+    super();
+  }
+
+  public DoubleMessage(String tag, double data) {
+    super();
+    this.data = data;
+    this.tag = tag;
+  }
+
+  @Override
+  public String getTag() {
+    return tag;
+  }
+
+  @Override
+  public Double getData() {
+    return data;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(tag);
+    out.writeDouble(data);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    tag = in.readUTF();
+    data = in.readDouble();
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FCFSQueue.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+class FCFSQueue implements Queue<JobInProgress> {
+
+  public static final Log LOG = LogFactory.getLog(FCFSQueue.class);
+  private final String name;
+  private BlockingQueue<JobInProgress> queue = new LinkedBlockingQueue<JobInProgress>();
+
+  public FCFSQueue(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public String getName() {
+    return this.name;
+  }
+
+  @Override
+  public void addJob(JobInProgress job) {
+    try {
+      queue.put(job);
+    } catch (InterruptedException ie) {
+      LOG.error("Fail to add a job to the " + this.name + " queue.", ie);
+    }
+  }
+
+  @Override
+  public void removeJob(JobInProgress job) {
+    queue.remove(job);
+  }
+
+  @Override
+  public JobInProgress removeJob() {
+    try {
+      return queue.take();
+    } catch (InterruptedException ie) {
+      LOG.error("Fail to remove a job from the " + this.name + " queue.", ie);
+    }
+    return null;
+  }
+
+  @Override
+  public Collection<JobInProgress> jobs() {
+    return queue;
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,920 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.ipc.BSPPeerProtocol;
+import org.apache.hama.ipc.MasterProtocol;
+import org.apache.hama.ipc.GroomProtocol;
+import org.apache.log4j.LogManager;
+
+/**
+ * A Groom Server (shortly referred to as groom) is a process that performs bsp
+ * tasks assigned by BSPMaster. Each groom contacts the BSPMaster, and it takes
+ * assigned tasks and reports its status by means of periodical piggybacks with
+ * BSPMaster. Each groom is designed to run with HDFS or other distributed
+ * storages. Basically, a groom server and a data node should be run on one
+ * physical node.
+ */
+public class GroomServer implements Runnable, GroomProtocol, BSPPeerProtocol {
+  public static final Log LOG = LogFactory.getLog(GroomServer.class);
+  static final String SUBDIR = "groomServer";
+
+  private volatile static int REPORT_INTERVAL = 1 * 1000;
+
+  Configuration conf;
+
+  // Constants
+  static enum State {
+    NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
+  };
+
+  // Running States and its related things
+  volatile boolean initialized = false;
+  volatile boolean running = true;
+  volatile boolean shuttingDown = false;
+  boolean justInited = true;
+  GroomServerStatus status = null;
+
+  // Attributes
+  String groomServerName;
+  String localHostname;
+  String groomHostName;
+  InetSocketAddress bspMasterAddr;
+  private Instructor instructor;
+
+  // Filesystem
+  // private LocalDirAllocator localDirAllocator;
+  Path systemDirectory = null;
+  FileSystem systemFS = null;
+
+  // Job
+  private int failures;
+  private int maxCurrentTasks = 1;
+  Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
+  /** Map from taskId -> TaskInProgress. */
+  Map<TaskAttemptID, TaskInProgress> runningTasks = null;
+  Map<TaskAttemptID, TaskInProgress> finishedTasks = null;
+  Map<BSPJobID, RunningJob> runningJobs = null;
+
+  // new nexus between GroomServer and BSPMaster
+  // holds/ manage all tasks
+  // List<TaskInProgress> tasksList = new
+  // CopyOnWriteArrayList<TaskInProgress>();
+
+  private String rpcServer;
+  private Server workerServer;
+  MasterProtocol masterClient;
+
+  InetSocketAddress taskReportAddress;
+  Server taskReportServer = null;
+
+  private PeerNames allPeerNames = null;
+
+  // private BlockingQueue<GroomServerAction> tasksToCleanup = new
+  // LinkedBlockingQueue<GroomServerAction>();
+
+  private class DispatchTasksHandler implements DirectiveHandler {
+
+    public void handle(Directive directive) throws DirectiveException {
+      GroomServerAction[] actions = ((DispatchTasksDirective) directive)
+          .getActions();
+
+      allPeerNames = new PeerNames(((DispatchTasksDirective) directive)
+          .getGroomServerPeers().values());
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got Response from BSPMaster with "
+            + ((actions != null) ? actions.length : 0) + " actions");
+      }
+
+      if (actions != null) {
+        for (GroomServerAction action : actions) {
+          if (action instanceof LaunchTaskAction) {
+            startNewTask((LaunchTaskAction) action);
+          } else {
+
+            // TODO Use the cleanup thread
+            // tasksToCleanup.put(action);
+
+            KillTaskAction killAction = (KillTaskAction) action;
+            if (tasks.containsKey(killAction.getTaskID())) {
+              TaskInProgress tip = tasks.get(killAction.getTaskID());
+              tip.taskStatus.setRunState(TaskStatus.State.FAILED);
+              try {
+                tip.killAndCleanup(true);
+              } catch (IOException ioe) {
+                throw new DirectiveException("Error when killing a "
+                    + "TaskInProgress.", ioe);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private class Instructor extends Thread {
+    final BlockingQueue<Directive> buffer = new LinkedBlockingQueue<Directive>();
+    final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
+
+    public void bind(Class<? extends Directive> instruction,
+        DirectiveHandler handler) {
+      handlers.putIfAbsent(instruction, handler);
+    }
+
+    public void put(Directive directive) {
+      try {
+        buffer.put(directive);
+      } catch (InterruptedException ie) {
+        LOG.error("Unable to put directive into queue.", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    public void run() {
+      while (true) {
+        try {
+          Directive directive = buffer.take();
+          if (directive instanceof DispatchTasksDirective) {
+            ((DirectiveHandler) handlers.get(DispatchTasksDirective.class))
+                .handle(directive);
+          } else {
+            throw new RuntimeException("Directive is not supported."
+                + directive);
+          }
+        } catch (InterruptedException ie) {
+          LOG.error("Unable to retrieve directive from the queue.", ie);
+          Thread.currentThread().interrupt();
+        } catch (Exception e) {
+          LOG.error("Fail to execute directive.", e);
+        }
+      }
+    }
+  }
+
+  public GroomServer(Configuration conf) throws IOException {
+    LOG.info("groom start");
+    this.conf = conf;
+
+    bspMasterAddr = BSPMaster.getAddress(conf);
+    // FileSystem local = FileSystem.getLocal(conf);
+    // this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
+  }
+
+  public synchronized void initialize() throws IOException {
+    if (this.conf.get(Constants.PEER_HOST) != null) {
+      this.localHostname = conf.get(Constants.PEER_HOST);
+    }
+
+    if (localHostname == null) {
+      this.localHostname = DNS.getDefaultHost(
+          conf.get("bsp.dns.interface", "default"),
+          conf.get("bsp.dns.nameserver", "default"));
+    }
+    // check local disk
+    checkLocalDirs(conf.getStrings("bsp.local.dir"));
+    deleteLocalFiles("groomserver");
+
+    // Clear out state tables
+    this.tasks.clear();
+    this.runningJobs = new TreeMap<BSPJobID, RunningJob>();
+    this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+    this.finishedTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+    this.conf.set(Constants.PEER_HOST, localHostname);
+    this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
+
+    int rpcPort = -1;
+    String rpcAddr = null;
+    if (false == this.initialized) {
+      rpcAddr = conf.get(Constants.GROOM_RPC_HOST,
+          Constants.DEFAULT_GROOM_RPC_HOST);
+      rpcPort = conf.getInt(Constants.GROOM_RPC_PORT,
+          Constants.DEFAULT_GROOM_RPC_PORT);
+      if (-1 == rpcPort || null == rpcAddr)
+        throw new IllegalArgumentException("Error rpc address " + rpcAddr
+            + " port" + rpcPort);
+      this.workerServer = RPC.getServer(this, rpcAddr, rpcPort, conf);
+      this.workerServer.start();
+      this.rpcServer = rpcAddr + ":" + rpcPort;
+
+      LOG.info("Worker rpc server --> " + rpcServer);
+    }
+
+    @SuppressWarnings("deprecation")
+    String address = NetUtils.getServerAddress(conf,
+        "bsp.groom.report.bindAddress", "bsp.groom.report.port",
+        "bsp.groom.report.address");
+    InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
+    String bindAddress = socAddr.getHostName();
+    int tmpPort = socAddr.getPort();
+
+    // RPC initialization
+    // TODO numHandlers should be a ..
+    this.taskReportServer = RPC.getServer(this, bindAddress, tmpPort, 10,
+        false, this.conf);
+
+    this.taskReportServer.start();
+
+    // get the assigned address
+    this.taskReportAddress = taskReportServer.getListenerAddress();
+    this.conf.set("bsp.groom.report.address", taskReportAddress.getHostName()
+        + ":" + taskReportAddress.getPort());
+    LOG.info("GroomServer up at: " + this.taskReportAddress);
+
+    this.groomHostName = rpcAddr;
+    this.groomServerName = "groomd_" + this.rpcServer.replace(':', '_');
+    LOG.info("Starting groom: " + this.rpcServer);
+
+    DistributedCache.purgeCache(this.conf);
+
+    // establish the communication link to bsp master
+    this.masterClient = (MasterProtocol) RPC.waitForProxy(MasterProtocol.class,
+        MasterProtocol.versionID, bspMasterAddr, conf);
+
+    // enroll in bsp master
+    if (-1 == rpcPort || null == rpcAddr)
+      throw new IllegalArgumentException("Error rpc address " + rpcAddr
+          + " port" + rpcPort);
+    if (!this.masterClient.register(new GroomServerStatus(groomServerName,
+        getBspPeerName(), cloneAndResetRunningTaskStatuses(), failures,
+        maxCurrentTasks, this.rpcServer))) {
+      LOG.error("There is a problem in establishing communication"
+          + " link with BSPMaster");
+      throw new IOException("There is a problem in establishing"
+          + " communication link with BSPMaster.");
+    }
+
+    this.instructor = new Instructor();
+    this.instructor.bind(DispatchTasksDirective.class,
+        new DispatchTasksHandler());
+    instructor.start();
+    this.running = true;
+    this.initialized = true;
+  }
+
+  /** Return the port at which the tasktracker bound to */
+  public synchronized InetSocketAddress getTaskTrackerReportAddress() {
+    return taskReportAddress;
+  }
+
+  @Override
+  public void dispatch(Directive directive) throws IOException {
+    if (!instructor.isAlive())
+      throw new IOException();
+
+    instructor.put(directive);
+  }
+
+  private static void checkLocalDirs(String[] localDirs)
+      throws DiskErrorException {
+    boolean writable = false;
+
+    LOG.info(localDirs);
+
+    if (localDirs != null) {
+      for (int i = 0; i < localDirs.length; i++) {
+        try {
+          LOG.info(localDirs[i]);
+          DiskChecker.checkDir(new File(localDirs[i]));
+          writable = true;
+        } catch (DiskErrorException e) {
+          LOG.warn("BSP Processor local " + e.getMessage());
+        }
+      }
+    }
+
+    if (!writable)
+      throw new DiskErrorException("all local directories are not writable");
+  }
+
+  public String[] getLocalDirs() {
+    return conf.getStrings("bsp.local.dir");
+  }
+
+  public void deleteLocalFiles() throws IOException {
+    String[] localDirs = getLocalDirs();
+    for (int i = 0; i < localDirs.length; i++) {
+      FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]), true);
+    }
+  }
+
+  public void deleteLocalFiles(String subdir) throws IOException {
+    try {
+      String[] localDirs = getLocalDirs();
+      for (int i = 0; i < localDirs.length; i++) {
+        FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir),
+            true);
+      }
+    } catch (NullPointerException e) {
+      LOG.info(e);
+    }
+  }
+
+  public void cleanupStorage() throws IOException {
+    deleteLocalFiles();
+  }
+
+  private void startCleanupThreads() throws IOException {
+
+  }
+
+  public State offerService() throws Exception {
+    while (running && !shuttingDown) {
+      try {
+
+        // Reports to a BSPMaster
+        for (Map.Entry<TaskAttemptID, TaskInProgress> e : runningTasks
+            .entrySet()) {
+          Thread.sleep(REPORT_INTERVAL);
+          TaskInProgress tip = e.getValue();
+          TaskStatus taskStatus = tip.getStatus();
+
+          if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
+            taskStatus.setProgress(taskStatus.getSuperstepCount());
+
+            if (!tip.runner.isAlive()) {
+              if (taskStatus.getRunState() != TaskStatus.State.FAILED) {
+                taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+              }
+              taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
+            }
+          }
+
+          doReport(taskStatus);
+        }
+
+        Thread.sleep(REPORT_INTERVAL);
+      } catch (InterruptedException ie) {
+      }
+
+      try {
+        if (justInited) {
+          String dir = masterClient.getSystemDir();
+          if (dir == null) {
+            LOG.error("Fail to get system directory.");
+            throw new IOException("Fail to get system directory.");
+          }
+          systemDirectory = new Path(dir);
+          systemFS = systemDirectory.getFileSystem(conf);
+        }
+        justInited = false;
+      } catch (DiskErrorException de) {
+        String msg = "Exiting groom server for disk error:\n"
+            + StringUtils.stringifyException(de);
+        LOG.error(msg);
+
+        return State.STALE;
+      } catch (RemoteException re) {
+        return State.DENIED;
+      } catch (Exception except) {
+        String msg = "Caught exception: "
+            + StringUtils.stringifyException(except);
+        LOG.error(msg);
+      }
+    }
+    return State.NORMAL;
+  }
+
+  private void startNewTask(LaunchTaskAction action) {
+    Task t = action.getTask();
+    BSPJob jobConf = null;
+    try {
+      jobConf = new BSPJob(t.getJobID(), t.getJobFile());
+    } catch (IOException e1) {
+      LOG.error(e1);
+    }
+
+    TaskInProgress tip = new TaskInProgress(t, jobConf, this.groomServerName);
+
+    synchronized (this) {
+      tasks.put(t.getTaskID(), tip);
+      runningTasks.put(t.getTaskID(), tip);
+    }
+
+    try {
+      localizeJob(tip);
+    } catch (Throwable e) {
+      String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
+          .stringifyException(e));
+      LOG.warn(msg);
+    }
+  }
+
+  /**
+   * Update and report refresh status back to BSPMaster.
+   */
+  public void doReport(TaskStatus taskStatus) {
+    GroomServerStatus groomStatus = new GroomServerStatus(groomServerName,
+        getBspPeerName(), updateTaskStatus(taskStatus), failures,
+        maxCurrentTasks, rpcServer);
+    try {
+      boolean ret = masterClient.report(new ReportGroomStatusDirective(
+          groomStatus));
+      if (!ret) {
+        LOG.warn("Fail to renew BSPMaster's GroomServerStatus. "
+            + " groom name: " + groomStatus.getGroomName() + " peer name:"
+            + groomStatus.getPeerName() + " rpc server:" + rpcServer);
+      }
+    } catch (IOException ioe) {
+      LOG.error("Fail to communicate with BSPMaster for reporting.", ioe);
+    }
+  }
+
+  public List<TaskStatus> updateTaskStatus(TaskStatus taskStatus) {
+    List<TaskStatus> tlist = new ArrayList<TaskStatus>();
+    synchronized (runningTasks) {
+
+      if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED
+          || taskStatus.getRunState() == TaskStatus.State.FAILED) {
+        synchronized (finishedTasks) {
+          TaskInProgress tip = runningTasks.remove(taskStatus.getTaskId());
+          tlist.add((TaskStatus) taskStatus.clone());
+          finishedTasks.put(taskStatus.getTaskId(), tip);
+        }
+      } else if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
+        tlist.add((TaskStatus) taskStatus.clone());
+      }
+
+    }
+    return tlist;
+  }
+
+  private void localizeJob(TaskInProgress tip) throws IOException {
+    Task task = tip.getTask();
+    conf.addResource(task.getJobFile());
+    BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
+    Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+        + task.getTaskID() + "/" + "job.xml");
+
+    RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
+    BSPJob jobConf = null;
+
+    synchronized (rjob) {
+      if (!rjob.localized) {
+        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+            + task.getTaskID() + "/" + "job.jar");
+        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
+
+        HamaConfiguration conf = new HamaConfiguration();
+        conf.addResource(localJobFile);
+        jobConf = new BSPJob(conf, task.getJobID().toString());
+
+        Path jarFile = new Path(jobConf.getJar());
+        jobConf.setJar(localJarFile.toString());
+
+        if (jarFile != null) {
+          systemFS.copyToLocalFile(jarFile, localJarFile);
+
+          // also unjar the job.jar files in workdir
+          File workDir = new File(
+              new File(localJobFile.toString()).getParent(), "work");
+          if (!workDir.mkdirs()) {
+            if (!workDir.isDirectory()) {
+              throw new IOException("Mkdirs failed to create "
+                  + workDir.toString());
+            }
+          }
+          RunJar.unJar(new File(localJarFile.toString()), workDir);
+        }
+        rjob.localized = true;
+      }
+    }
+
+    launchTaskForJob(tip, jobConf);
+  }
+
+  private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
+    try {
+      tip.setJobConf(jobConf);
+      tip.launchTask();
+    } catch (Throwable ie) {
+      tip.taskStatus.setRunState(TaskStatus.State.FAILED);
+      String error = StringUtils.stringifyException(ie);
+      LOG.info(error);
+    }
+  }
+
+  private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
+      TaskInProgress tip) {
+    synchronized (runningJobs) {
+      RunningJob rJob = null;
+      if (!runningJobs.containsKey(jobId)) {
+        rJob = new RunningJob(jobId, localJobFile);
+        rJob.localized = false;
+        rJob.tasks = new HashSet<TaskInProgress>();
+        rJob.jobFile = localJobFile;
+        runningJobs.put(jobId, rJob);
+      } else {
+        rJob = runningJobs.get(jobId);
+      }
+      rJob.tasks.add(tip);
+      return rJob;
+    }
+  }
+
+  /**
+   * The datastructure for initializing a job
+   */
+  static class RunningJob {
+    private BSPJobID jobid;
+    private Path jobFile;
+    // keep this for later use
+    Set<TaskInProgress> tasks;
+    boolean localized;
+    boolean keepJobFiles;
+
+    RunningJob(BSPJobID jobid, Path jobFile) {
+      this.jobid = jobid;
+      localized = false;
+      tasks = new HashSet<TaskInProgress>();
+      this.jobFile = jobFile;
+      keepJobFiles = false;
+    }
+
+    Path getJobFile() {
+      return jobFile;
+    }
+
+    BSPJobID getJobId() {
+      return jobid;
+    }
+  }
+
+  private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses() {
+    List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
+    for (TaskInProgress tip : runningTasks.values()) {
+      TaskStatus status = tip.getStatus();
+      result.add((TaskStatus) status.clone());
+    }
+    return result;
+  }
+
+  public void run() {
+    try {
+      initialize();
+      startCleanupThreads();
+      boolean denied = false;
+      while (running && !shuttingDown && !denied) {
+
+        boolean staleState = false;
+        try {
+          while (running && !staleState && !shuttingDown && !denied) {
+            try {
+              State osState = offerService();
+              if (osState == State.STALE) {
+                staleState = true;
+              } else if (osState == State.DENIED) {
+                denied = true;
+              }
+            } catch (Exception e) {
+              if (!shuttingDown) {
+                LOG.info("Lost connection to BSP Master [" + bspMasterAddr
+                    + "].  Retrying...", e);
+                try {
+                  Thread.sleep(5000);
+                } catch (InterruptedException ie) {
+                }
+              }
+            }
+          }
+        } finally {
+          // close();
+        }
+        if (shuttingDown) {
+          return;
+        }
+        LOG.warn("Reinitializing local state");
+        initialize();
+      }
+    } catch (IOException ioe) {
+      LOG.error("Got fatal exception while reinitializing GroomServer: "
+          + StringUtils.stringifyException(ioe));
+      return;
+    }
+  }
+
+  public synchronized void shutdown() throws IOException {
+    shuttingDown = true;
+    close();
+  }
+
+  public synchronized void close() throws IOException {
+    this.running = false;
+    this.initialized = false;
+    cleanupStorage();
+    this.workerServer.stop();
+    RPC.stopProxy(masterClient);
+
+    if (taskReportServer != null) {
+      taskReportServer.stop();
+      taskReportServer = null;
+    }
+  }
+
+  public static Thread startGroomServer(final GroomServer hrs) {
+    return startGroomServer(hrs, "regionserver" + hrs.groomServerName);
+  }
+
+  public static Thread startGroomServer(final GroomServer hrs, final String name) {
+    Thread t = new Thread(hrs);
+    t.setName(name);
+    t.start();
+    return t;
+  }
+
+  // /////////////////////////////////////////////////////
+  // TaskInProgress maintains all the info for a Task that
+  // lives at this GroomServer. It maintains the Task object,
+  // its TaskStatus, and the BSPTaskRunner.
+  // /////////////////////////////////////////////////////
+  class TaskInProgress {
+    Task task;
+    BSPJob jobConf;
+    BSPJob localJobConf;
+    BSPTaskRunner runner;
+    volatile boolean done = false;
+    volatile boolean wasKilled = false;
+    private TaskStatus taskStatus;
+
+    public TaskInProgress(Task task, BSPJob jobConf, String groomServer) {
+      this.task = task;
+      this.jobConf = jobConf;
+      this.localJobConf = null;
+      this.taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(), 0,
+          TaskStatus.State.UNASSIGNED, "running", groomServer,
+          TaskStatus.Phase.STARTING);
+    }
+
+    private void localizeTask(Task task) throws IOException {
+      Path localJobFile = this.jobConf.getLocalPath(SUBDIR + "/"
+          + task.getTaskID() + "/job.xml");
+      Path localJarFile = this.jobConf.getLocalPath(SUBDIR + "/"
+          + task.getTaskID() + "/job.jar");
+
+      String jobFile = task.getJobFile();
+      systemFS.copyToLocalFile(new Path(jobFile), localJobFile);
+      task.setJobFile(localJobFile.toString());
+
+      localJobConf = new BSPJob(task.getJobID(), localJobFile.toString());
+      localJobConf.set("bsp.task.id", task.getTaskID().toString());
+      String jarFile = localJobConf.getJar();
+
+      if (jarFile != null) {
+        systemFS.copyToLocalFile(new Path(jarFile), localJarFile);
+        localJobConf.setJar(localJarFile.toString());
+      }
+
+      LOG.debug("localizeTask : " + localJobConf.getJar());
+      LOG.debug("localizeTask : " + localJobFile.toString());
+
+      task.setConf(localJobConf);
+    }
+
+    public synchronized void setJobConf(BSPJob jobConf) {
+      this.jobConf = jobConf;
+    }
+
+    public synchronized BSPJob getJobConf() {
+      return localJobConf;
+    }
+
+    public void launchTask() throws IOException {
+      localizeTask(task);
+      taskStatus.setRunState(TaskStatus.State.RUNNING);
+      this.runner = task.createRunner(GroomServer.this);
+      this.runner.start();
+    }
+
+    /**
+     * This task has run on too long, and should be killed.
+     */
+    public synchronized void killAndCleanup(boolean wasFailure)
+        throws IOException {
+      runner.kill();
+    }
+
+    /**
+     */
+    public Task getTask() {
+      return task;
+    }
+
+    /**
+     */
+    public synchronized TaskStatus getStatus() {
+      return taskStatus;
+    }
+
+    /**
+     */
+    public TaskStatus.State getRunState() {
+      return taskStatus.getRunState();
+    }
+
+    public boolean wasKilled() {
+      return wasKilled;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return (obj instanceof TaskInProgress)
+          && task.getTaskID().equals(
+              ((TaskInProgress) obj).getTask().getTaskID());
+    }
+
+    @Override
+    public int hashCode() {
+      return task.getTaskID().hashCode();
+    }
+  }
+
+  public boolean isRunning() {
+    return running;
+  }
+
+  public static GroomServer constructGroomServer(
+      Class<? extends GroomServer> groomServerClass, final Configuration conf2) {
+    try {
+      Constructor<? extends GroomServer> c = groomServerClass
+          .getConstructor(Configuration.class);
+      return c.newInstance(conf2);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed construction of " + "Master: "
+          + groomServerClass.toString(), e);
+    }
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    if (protocol.equals(GroomProtocol.class.getName())) {
+      return GroomProtocol.versionID;
+    } else if (protocol.equals(BSPPeerProtocol.class.getName())) {
+      return BSPPeerProtocol.versionID;
+    } else {
+      throw new IOException("Unknown protocol to GroomServer: " + protocol);
+    }
+  }
+
+  /**
+   * @return bsp peer information in the form of "address:port".
+   */
+  public String getBspPeerName() {
+    // TODO Later, peers list should be returned.
+    return this.groomHostName + ":" + Constants.DEFAULT_PEER_PORT;
+  }
+
+  /**
+   * The main() for child processes.
+   */
+  public static class Child {
+
+    public static void main(String[] args) throws Throwable {
+      LOG.debug("Child starting");
+
+      HamaConfiguration defaultConf = new HamaConfiguration();
+      // report address
+      String host = args[0];
+      int port = Integer.parseInt(args[1]);
+      InetSocketAddress address = new InetSocketAddress(host, port);
+      TaskAttemptID taskid = TaskAttemptID.forName(args[2]);
+
+      // //////////////////
+      BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
+          BSPPeerProtocol.class, BSPPeerProtocol.versionID, address,
+          defaultConf);
+
+      Task task = umbilical.getTask(taskid);
+
+      defaultConf.addResource(new Path(task.getJobFile()));
+      BSPJob job = new BSPJob(task.getJobID(), task.getJobFile());
+
+      defaultConf.set(Constants.PEER_HOST, args[3]);
+      defaultConf.setInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+
+      BSPPeer bspPeer = new BSPPeer(defaultConf, taskid, umbilical);
+      bspPeer.setJobConf(job);
+      bspPeer.setAllPeerNames(umbilical.getAllPeerNames().getAllPeerNames());
+
+      TaskStatus taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(),
+          0, TaskStatus.State.RUNNING, "running", host,
+          TaskStatus.Phase.STARTING);
+
+      bspPeer.setCurrentTaskStatus(taskStatus);
+
+      try {
+        // use job-specified working directory
+        FileSystem.get(job.getConf()).setWorkingDirectory(
+            job.getWorkingDirectory());
+
+        task.run(job, bspPeer, umbilical); // run the task
+
+      } catch (FSError e) {
+        LOG.fatal("FSError from child", e);
+        umbilical.fsError(taskid, e.getMessage());
+      } catch (Throwable throwable) {
+        LOG.warn("Error running child", throwable);
+        // Report back any failures, for diagnostic purposes
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        throwable.printStackTrace(new PrintStream(baos));
+      } finally {
+        bspPeer.close(); // close peer.
+
+        RPC.stopProxy(umbilical);
+        // Shutting down log4j of the child-vm...
+        // This assumes that on return from Task.run()
+        // there is no more logging done.
+        LogManager.shutdown();
+      }
+    }
+  }
+
+  @Override
+  public Task getTask(TaskAttemptID taskid) throws IOException {
+    TaskInProgress tip = tasks.get(taskid);
+    if (tip != null) {
+      return tip.getTask();
+    } else {
+      return null;
+    }
+  }
+
+  public void incrementSuperstepCount(TaskAttemptID taskid) throws IOException {
+    TaskInProgress tip = tasks.get(taskid);
+    tip.getStatus().incrementSuperstepCount();
+  }
+
+  @Override
+  public boolean ping(TaskAttemptID taskid) throws IOException {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public void done(TaskAttemptID taskid, boolean shouldBePromoted)
+      throws IOException {
+    // TODO Auto-generated method stub
+  }
+
+  @Override
+  public void fsError(TaskAttemptID taskId, String message) throws IOException {
+    // TODO Auto-generated method stub
+  }
+
+  @Override
+  public PeerNames getAllPeerNames() {
+    return allPeerNames;
+  }
+}
\ No newline at end of file

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerAction.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A generic directive from the {@link org.apache.hama.bsp.BSPMaster}
+ * to the {@link org.apache.hama.bsp.GroomServer} to take some 'action'. 
+ */
+abstract class GroomServerAction implements Writable {
+  
+  /**
+   * Ennumeration of various 'actions' that the {@link BSPMaster}
+   * directs the {@link GroomServer} to perform periodically.
+   * 
+   */
+  public static enum ActionType {
+    /** Launch a new task. */
+    LAUNCH_TASK,
+    
+    /** Kill a task. */
+    KILL_TASK,
+    
+    /** Kill any tasks of this job and cleanup. */
+    KILL_JOB,
+    
+    /** Reinitialize the groomserver. */
+    REINIT_GROOM,
+
+    /** Ask a task to save its output. */
+    COMMIT_TASK
+  };
+  
+  /**
+   * A factory-method to create objects of given {@link ActionType}. 
+   * @param actionType the {@link ActionType} of object to create.
+   * @return an object of {@link ActionType}.
+   */
+  public static GroomServerAction createAction(ActionType actionType) {
+    GroomServerAction action = null;
+    
+    switch (actionType) {
+    case LAUNCH_TASK:
+      {
+        action = new LaunchTaskAction();
+      }
+      break;
+    case KILL_TASK:
+      {
+        action = new KillTaskAction();
+      }
+      break;
+    case KILL_JOB:
+      {
+        action = new KillJobAction();
+      }
+      break;
+    case REINIT_GROOM:
+      {
+        action = new ReinitGroomAction();
+      }
+      break;
+    case COMMIT_TASK:
+      {
+        action = new CommitTaskAction();
+      }
+      break;
+    }
+
+    return action;
+  }
+  
+  private ActionType actionType;
+  
+  protected GroomServerAction(ActionType actionType) {
+    this.actionType = actionType;
+  }
+  
+  /**
+   * Return the {@link ActionType}.
+   * @return the {@link ActionType}.
+   */
+  ActionType getActionType() {
+    return actionType;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeEnum(out, actionType);
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    actionType = WritableUtils.readEnum(in, ActionType.class);
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hama.ipc.GroomProtocol;
+
+/**
+ * Manages information about the {@link GroomServer}s in the cluster 
+ * environment. This interface is not intended to be implemented by users.
+ */
+interface GroomServerManager {
+
+  /**
+   * Get the current status of the cluster
+   * @param detailed if true then report groom names as well
+   * @return summary of the state of the cluster
+   */
+  ClusterStatus getClusterStatus(boolean detailed);
+
+  /**
+   * Find WorkerProtocol with corresponded groom server status
+   * 
+   * @param groomId The identification value of GroomServer 
+   * @return GroomServerStatus 
+   */
+  GroomProtocol findGroomServer(GroomServerStatus status);
+
+  /**
+   * Find the collection of groom servers.
+   * 
+   * @return Collection of groom servers list.
+   */
+  Collection<GroomProtocol> findGroomServers();
+
+  /**
+   * Collection of GroomServerStatus as the key set.
+   *
+   * @return Collection of GroomServerStatus.
+   */
+  Collection<GroomServerStatus> groomServerStatusKeySet();
+
+  /**
+   * Registers a JobInProgressListener to GroomServerManager. Therefore,
+   * adding a JobInProgress will trigger the jobAdded function.
+   * @param the JobInProgressListener listener to be added.
+   */
+  void addJobInProgressListener(JobInProgressListener listener);
+
+  /**
+   * Unregisters a JobInProgressListener to GroomServerManager. Therefore,
+   * the remove of a JobInProgress will trigger the jobRemoved action.
+   * @param the JobInProgressListener to be removed.
+   */
+  void removeJobInProgressListener(JobInProgressListener listener);
+
+  /**
+   * Current GroomServer Peers.
+   * @return GroomName and PeerName(host:port) in pair. 
+   */
+  Map<String, String> currentGroomServerPeers();
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * A GroomServerStatus is a BSP primitive. Keeps info on a BSPMaster. The
+ * BSPMaster maintains a set of the most recent GroomServerStatus objects for
+ * each unique GroomServer it knows about.
+ */
+public class GroomServerStatus implements Writable {
+  public static final Log LOG = LogFactory.getLog(GroomServerStatus.class);
+
+  static {
+    WritableFactories.setFactory(GroomServerStatus.class,
+        new WritableFactory() {
+          public Writable newInstance() {
+            return new GroomServerStatus();
+          }
+        });
+  }
+
+  String groomName;
+  String peerName;
+  String rpcServer;
+  int failures;
+  List<TaskStatus> taskReports;
+
+  volatile long lastSeen;
+  private int maxTasks;
+
+  public GroomServerStatus() {
+    // taskReports = new ArrayList<TaskStatus>();
+    taskReports = new CopyOnWriteArrayList<TaskStatus>();
+  }
+
+  public GroomServerStatus(String groomName, String peerName,
+      List<TaskStatus> taskReports, int failures, int maxTasks) {
+    this(groomName, peerName, taskReports, failures, maxTasks, "");
+  }
+
+  public GroomServerStatus(String groomName, String peerName,
+      List<TaskStatus> taskReports, int failures, int maxTasks, String rpc) {
+    this.groomName = groomName;
+    this.peerName = peerName;
+    this.taskReports = new ArrayList<TaskStatus>(taskReports);
+    this.failures = failures;
+    this.maxTasks = maxTasks;
+    this.rpcServer = rpc;
+  }
+
+  public String getGroomName() {
+    return groomName;
+  }
+
+  /**
+   * The host (and port) from where the groom server can be reached.
+   * 
+   * @return The groom server address in the form of "hostname:port"
+   */
+  public String getPeerName() {
+    return peerName;
+  }
+
+  public String getRpcServer() {
+    return rpcServer;
+  }
+
+  /**
+   * Get the current tasks at the GroomServer. Tasks are tracked by a
+   * {@link TaskStatus} object.
+   * 
+   * @return a list of {@link TaskStatus} representing the current tasks at the
+   *         GroomServer.
+   */
+  public List<TaskStatus> getTaskReports() {
+    return taskReports;
+  }
+
+  public int getFailures() {
+    return failures;
+  }
+
+  public long getLastSeen() {
+    return lastSeen;
+  }
+
+  public void setLastSeen(long lastSeen) {
+    this.lastSeen = lastSeen;
+  }
+
+  public int getMaxTasks() {
+    return maxTasks;
+  }
+
+  /**
+   * Return the current MapTask count
+   */
+  public int countTasks() {
+    int taskCount = 0;
+    for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+      TaskStatus ts = it.next();
+      TaskStatus.State state = ts.getRunState();
+      if (state == TaskStatus.State.RUNNING
+          || state == TaskStatus.State.UNASSIGNED) {
+        taskCount++;
+      }
+    }
+
+    return taskCount;
+  }
+
+  /**
+   * For BSPMaster to distinguish between different GroomServers, because
+   * BSPMaster stores using GroomServerStatus as key.
+   */
+  @Override
+  public int hashCode() {
+    int result = 17;
+    result = 37 * result + groomName.hashCode();
+    result = 37 * result + peerName.hashCode();
+    result = 37 * result + rpcServer.hashCode();
+    /*
+     * result = 37*result + (int)failures; result = 37*result +
+     * taskReports.hashCode(); result = 37*result +
+     * (int)(lastSeen^(lastSeen>>>32)); result = 37*result + (int)maxTasks;
+     */
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == this)
+      return true;
+    if (null == o)
+      return false;
+    if (getClass() != o.getClass())
+      return false;
+
+    GroomServerStatus s = (GroomServerStatus) o;
+    if (!s.groomName.equals(groomName))
+      return false;
+    if (!s.peerName.equals(peerName))
+      return false;
+    if (!s.rpcServer.equals(rpcServer))
+      return false;
+    /*
+     * if(s.failures != failures) return false; if(null == s.taskReports){
+     * if(null != s.taskReports) return false; }else
+     * if(!s.taskReports.equals(taskReports)){ return false; } if(s.lastSeen !=
+     * lastSeen) return false; if(s.maxTasks != maxTasks) return false;
+     */
+    return true;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+   */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.groomName = Text.readString(in);
+    this.peerName = Text.readString(in);
+    this.rpcServer = Text.readString(in);
+    this.failures = in.readInt();
+    this.maxTasks = in.readInt();
+    taskReports.clear();
+    int numTasks = in.readInt();
+
+    TaskStatus status;
+    for (int i = 0; i < numTasks; i++) {
+      status = new TaskStatus();
+      status.readFields(in);
+      taskReports.add(status);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, groomName);
+    Text.writeString(out, peerName);
+    Text.writeString(out, rpcServer);
+    out.writeInt(failures);
+    out.writeInt(maxTasks);
+    out.writeInt(taskReports.size());
+    for (TaskStatus taskStatus : taskReports) {
+      taskStatus.write(out);
+    }
+  }
+
+  public Iterator<TaskStatus> taskReports() {
+    return taskReports.iterator();
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ID.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ID.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ID.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A general identifier, which internally stores the id as an integer. This is
+ * the super class of {@link BSPJobID}, {@link TaskID} and {@link TaskAttemptID}
+ * .
+ */
+public abstract class ID implements WritableComparable<ID> {
+  protected static final char SEPARATOR = '_';
+  protected int id;
+
+  public ID(int id) {
+    this.id = id;
+  }
+
+  protected ID() {
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  @Override
+  public String toString() {
+    return String.valueOf(id);
+  }
+
+  @Override
+  public int hashCode() {
+    return Integer.valueOf(id).hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (o == null)
+      return false;
+    if (o.getClass() == this.getClass()) {
+      ID that = (ID) o;
+      return this.id == that.id;
+    } else
+      return false;
+  }
+
+  public int compareTo(ID that) {
+    return this.id - that.id;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.id = in.readInt();
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(id);
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerDoubleMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerDoubleMessage.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerDoubleMessage.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerDoubleMessage.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a int tag and a double data. 
+ */
+public class IntegerDoubleMessage extends BSPMessage {
+
+  int tag;
+  double data;
+
+  public IntegerDoubleMessage() {
+    super();
+  }
+
+  public IntegerDoubleMessage(int tag, double data) {
+    super();
+    this.tag = tag;
+    this.data = data;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(tag);
+    out.writeDouble(data);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    tag = in.readInt();
+    data = in.readDouble();
+  }
+
+  @Override
+  public Integer getTag() {
+    return tag;
+  }
+
+  @Override
+  public Double getData() {
+    return data;
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerMessage.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerMessage.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/IntegerMessage.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A message that consists of a string tag and a int data. 
+ */
+public class IntegerMessage extends BSPMessage {
+
+  String tag;
+  int data;
+
+  public IntegerMessage() {
+    super();
+  }
+
+  public IntegerMessage(String tag, int data) {
+    super();
+    this.tag = tag;
+    this.data = data;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(tag);
+    out.writeInt(data);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    tag = in.readUTF();
+    data = in.readInt();
+  }
+
+  @Override
+  public String getTag() {
+    return tag;
+  }
+
+  @Override
+  public Integer getData() {
+    return data;
+  }
+
+}
\ No newline at end of file

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobChangeEvent.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobChangeEvent.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobChangeEvent.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobChangeEvent.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * {@link JobChangeEvent} is used to capture state changes in a job. A job can
+ * change its state w.r.t priority, progress, run-state etc.
+ */
+abstract class JobChangeEvent {
+  private JobInProgress jip;
+
+  JobChangeEvent(JobInProgress jip) {
+    this.jip = jip;
+  }
+
+  /**
+   * Get the job object for which the change is reported
+   */
+  JobInProgress getJobInProgress() {
+    return jip;
+  }
+}