You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/04/28 05:47:19 UTC

svn commit: r1097314 - in /incubator/hama/trunk: ./ conf/ src/java/org/apache/hama/bsp/ src/java/org/apache/hama/ipc/ src/test/org/apache/hama/ src/test/org/apache/hama/bsp/ src/test/testjar/

Author: edwardyoon
Date: Thu Apr 28 03:47:18 2011
New Revision: 1097314

URL: http://svn.apache.org/viewvc?rev=1097314&view=rev
Log:
Refactor BSPMaster and GroomServer

Added:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveException.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveHandler.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/DispatchTasksDirective.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/ReportGroomStatusDirective.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    incubator/hama/trunk/src/test/testjar/
    incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java
    incubator/hama/trunk/src/test/testjar/testjob.jar   (with props)
Removed:
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestDirective.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/conf/hama-default.xml
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java
    incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java
    incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java
    incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Apr 28 03:47:18 2011
@@ -10,6 +10,7 @@ Release 0.3 - Unreleased
 
   IMPROVEMENTS
 
+    HAMA-376: Refactor BSPMaster and GroomServer (ChiaHung Lin via edwardyoon)
     HAMA-382: Refactor HAMA POM (Tommaso Teofili)
     HAMA-380: Send messages in batches to reduce RPC overhead (Miklos Erdelyi via edwardyoon)
     HAMA-362: Re-design a new data structure of BSPMessage (Thomas Jungblut via edwardyoon)   

Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Thu Apr 28 03:47:18 2011
@@ -24,7 +24,7 @@
 <configuration>
   <property>
     <name>bsp.master.address</name>
-    <value>localhost</value>
+    <value>local</value>
     <description>The address of the bsp master server. Either the
     literal string "local" or a host[:port] (where host is a name or
     IP address) for distributed mode.

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Thu Apr 28 03:47:18 2011
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -52,7 +54,7 @@ import org.apache.hama.ipc.WorkerProtoco
  * BSPMaster is responsible to control all the groom servers and to manage bsp
  * jobs.
  */
-public class BSPMaster implements JobSubmissionProtocol, MasterProtocol, // InterServerProtocol,
+public class BSPMaster implements JobSubmissionProtocol, MasterProtocol, 
     GroomServerManager {
   public static final Log LOG = LogFactory.getLog(BSPMaster.class);
 
@@ -98,9 +100,114 @@ public class BSPMaster implements JobSub
   private TaskScheduler taskScheduler;
 
   // GroomServers cache
-  protected ConcurrentMap<GroomServerStatus, WorkerProtocol> groomServers = new ConcurrentHashMap<GroomServerStatus, WorkerProtocol>();
+  protected ConcurrentMap<GroomServerStatus, WorkerProtocol> groomServers =
+    new ConcurrentHashMap<GroomServerStatus, WorkerProtocol>();
 
-  private final List<JobInProgressListener> jobInProgressListeners = new CopyOnWriteArrayList<JobInProgressListener>();
+  private Instructor instructor;
+
+  private final List<JobInProgressListener> jobInProgressListeners = 
+    new CopyOnWriteArrayList<JobInProgressListener>();
+
+  private class ReportGroomStatusHandler implements DirectiveHandler{
+
+    public void handle(Directive directive) throws DirectiveException{
+      // update GroomServerStatus held in the groomServers cache.
+      GroomServerStatus groomStatus = 
+        ((ReportGroomStatusDirective)directive).getStatus();
+      // groomServers cache contains groom server status reported back
+      if (groomServers.containsKey(groomStatus)) {
+        GroomServerStatus tmpStatus = null;
+        for (GroomServerStatus old : groomServers.keySet()) {
+          if (old.equals(groomStatus)) {
+            tmpStatus = groomStatus;
+            updateGroomServersKey(old, tmpStatus);
+            break;
+          }
+        }// for
+        if (null != tmpStatus) {
+          List<TaskStatus> tlist = tmpStatus.getTaskReports();
+          for (TaskStatus ts : tlist) {
+            JobInProgress jip = whichJob(ts.getJobId());
+            TaskInProgress tip = jip.findTaskInProgress(((TaskAttemptID) ts
+                .getTaskId()).getTaskID());
+
+            if(ts.getRunState() == TaskStatus.State.SUCCEEDED) {
+              jip.completedTask(tip, ts);
+            } else if (ts.getRunState() == TaskStatus.State.RUNNING) {
+              // do nothing
+            } else if (ts.getRunState() == TaskStatus.State.FAILED) {
+              jip.status.setRunState(JobStatus.FAILED);
+              jip.failedTask(tip, ts);
+            }
+            if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
+              for(JobInProgressListener listener: jobInProgressListeners){
+                try {
+                  listener.jobRemoved(jip);
+                } catch (IOException ioe) {
+                  LOG.error("Fail to alter scheduler a job is moved.", ioe);
+                }
+              }
+            } else if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
+              jip.getStatus().setprogress(ts.getSuperstepCount());
+            } else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
+              WorkerProtocol worker = findGroomServer(tmpStatus);
+              Directive d1 = new DispatchTasksDirective(currentGroomServerPeers(), 
+                  new GroomServerAction[] {new KillTaskAction(ts.getTaskId()) });
+              try{
+                worker.dispatch(d1);
+              }catch(IOException ioe){
+                throw new DirectiveException("Error when dispatching kill task"+
+                " action.", ioe);
+              }
+            }
+          }
+        } else {
+          throw new RuntimeException("BSPMaster contains GroomServerSatus, "
+              + "but fail to retrieve it.");
+        }
+      } else {
+        throw new RuntimeException("GroomServer not found." + 
+        groomStatus.getGroomName());
+      }
+    }
+  }
+
+  private class Instructor extends Thread{
+    private final BlockingQueue<Directive> buffer = new LinkedBlockingQueue<Directive>();
+    private final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = 
+      new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
+
+    public void bind(Class<? extends Directive> instruction, 
+        DirectiveHandler handler){
+      handlers.putIfAbsent(instruction, handler);
+    }
+
+    public void put(Directive directive){
+      try{
+        buffer.put(directive);
+      }catch(InterruptedException ie){
+        LOG.error("Fail to put directive into queue.", ie); 
+      }
+    }
+
+    public void run(){
+      while(true){
+        try{
+          Directive directive = this.buffer.take(); 
+          if(directive instanceof ReportGroomStatusDirective){
+            ((DirectiveHandler)handlers.get(ReportGroomStatusDirective.class)).handle(directive);
+          }else{
+            throw new RuntimeException("Directive is not supported."+directive);
+          }
+        }catch(InterruptedException ie){
+          LOG.error("Unable to retrieve directive from the queue.", ie);
+          Thread.currentThread().interrupt();
+        }catch(Exception e){
+          LOG.error("Fail to execute directive command.", e);
+        }
+      }
+    }
+  }
 
   /**
    * Start the BSPMaster process, listen on the indicated hostname/port
@@ -114,7 +221,6 @@ public class BSPMaster implements JobSub
       InterruptedException {
     this.conf = conf;
     this.masterIdentifier = identifier;
-    // expireLaunchingTaskThread.start();
 
     // Create the scheduler and init scheduler services
     Class<? extends TaskScheduler> schedulerClass = conf.getClass(
@@ -228,72 +334,7 @@ public class BSPMaster implements JobSub
 
   @Override
   public boolean report(Directive directive) throws IOException {
-    // check returned directive type if equals response
-    if (directive.getType().value() != Directive.Type.Response.value()) {
-      throw new IllegalStateException("GroomServer should report()"
-          + " with Response. Current report type:" + directive.getType());
-    }
-    // update GroomServerStatus hold in groomServers cache.
-    GroomServerStatus fstus = directive.getStatus();
-
-    // groomServers cache contains groom server status reported back
-    if (groomServers.containsKey(fstus)) {
-      GroomServerStatus ustus = null;
-      for (GroomServerStatus old : groomServers.keySet()) {
-        if (old.equals(fstus)) {
-          ustus = fstus;
-          updateGroomServersKey(old, ustus);
-          break;
-        }
-      }// for
-
-      if (null != ustus) {
-        List<TaskStatus> tlist = ustus.getTaskReports();
-        for (TaskStatus ts : tlist) {
-          JobInProgress jip = whichJob(ts.getJobId());
-
-          TaskInProgress tip = jip.findTaskInProgress(((TaskAttemptID) ts
-              .getTaskId()).getTaskID());
-          
-          if(ts.getRunState() == TaskStatus.State.SUCCEEDED) {
-            jip.completedTask(tip, ts);
-          } else if (ts.getRunState() == TaskStatus.State.RUNNING) {
-            // do nothing
-          } else if (ts.getRunState() == TaskStatus.State.FAILED) {
-            jip.status.setRunState(JobStatus.FAILED);
-            jip.failedTask(tip, ts);
-          }
-          
-          if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
-            for (JobInProgressListener listener : jobInProgressListeners) {
-              try {
-                listener.jobRemoved(jip);
-              } catch (IOException ioe) {
-                LOG.error("Fail to alter scheduler a job is moved.", ioe);
-              }
-            }
-
-          } else if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
-            jip.getStatus().setprogress(ts.getSuperstepCount());
-          } else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
-            
-            WorkerProtocol worker = findGroomServer(ustus);
-            Directive d1 = new Directive(currentGroomServerPeers(),
-                new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) });
-            
-            worker.dispatch(d1);
-            
-          }
-          
-        }
-      } else {
-        throw new RuntimeException("BSPMaster contains GroomServerSatus, "
-            + "but fail to retrieve it.");
-      }
-    } else {
-      throw new RuntimeException("GroomServer not found."
-          + fstus.getGroomName());
-    }
+    instructor.put(directive); 
     return true;
   }
 
@@ -378,15 +419,20 @@ public class BSPMaster implements JobSub
   }
 
   public void offerService() throws InterruptedException, IOException {
-    // this.interServer.start();
+
     this.masterServer.start();
 
     synchronized (this) {
       state = State.RUNNING;
     }
+    
+    instructor = new Instructor();
+    instructor.bind(ReportGroomStatusDirective.class, 
+      new ReportGroomStatusHandler());
+    instructor.start();
+    
     LOG.info("Starting RUNNING");
 
-    // this.interServer.join();
     this.masterServer.join();
 
     LOG.info("Stopped RPC Master server.");

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Directive.java Thu Apr 28 03:47:18 2011
@@ -34,15 +34,10 @@ import org.apache.hadoop.io.WritableUtil
  * A generic directive from the {@link org.apache.hama.bsp.BSPMaster} to the
  * {@link org.apache.hama.bsp.GroomServer} to take some 'action'.
  */
-public class Directive implements Writable {
+public class Directive implements Writable{
 
-  public static final Log LOG = LogFactory.getLog(Directive.class);
-
-  private long timestamp;
-  private Directive.Type type;
-  private Map<String, String> groomServerPeers;
-  private GroomServerAction[] actions;
-  private GroomServerStatus status;
+  protected long timestamp;
+  protected Directive.Type type;
 
   public static enum Type {
     Request(1), Response(2);
@@ -57,22 +52,11 @@ public class Directive implements Writab
     }
   };
 
-  public Directive() {
-    this.timestamp = System.currentTimeMillis();
-  }
+  public Directive(){}
 
-  public Directive(Map<String, String> groomServerPeers,
-      GroomServerAction[] actions) {
-    this();
-    this.type = Directive.Type.Request;
-    this.groomServerPeers = groomServerPeers;
-    this.actions = actions;
-  }
-
-  public Directive(GroomServerStatus status) {
-    this();
-    this.type = Directive.Type.Response;
-    this.status = status;
+  public Directive(Directive.Type type) {
+    this.timestamp = System.currentTimeMillis();
+    this.type = type;
   }
 
   public long getTimestamp() {
@@ -83,48 +67,14 @@ public class Directive implements Writab
     return this.type;
   }
 
-  public Map<String, String> getGroomServerPeers() {
-    return this.groomServerPeers;
-  }
-
-  public GroomServerAction[] getActions() {
-    return this.actions;
-  }
-
-  public GroomServerStatus getStatus() {
-    return this.status;
-  }
+  /**
+   * Command for BSPMaster or GroomServer to execute.
+  public abstract void execute() throws Exception;
+   */
 
   public void write(DataOutput out) throws IOException {
     out.writeLong(this.timestamp);
     out.writeInt(this.type.value());
-    if (getType().value() == Directive.Type.Request.value()) {
-      if (this.actions == null) {
-        WritableUtils.writeVInt(out, 0);
-      } else {
-        WritableUtils.writeVInt(out, actions.length);
-        for (GroomServerAction action : this.actions) {
-          WritableUtils.writeEnum(out, action.getActionType());
-          action.write(out);
-        }
-      }
-      String[] groomServerNames = groomServerPeers.keySet().toArray(
-          new String[0]);
-      WritableUtils.writeCompressedStringArray(out, groomServerNames);
-
-      List<String> groomServerAddresses = new ArrayList<String>(
-          groomServerNames.length);
-      for (String groomName : groomServerNames) {
-        groomServerAddresses.add(groomServerPeers.get(groomName));
-      }
-      WritableUtils.writeCompressedStringArray(out, groomServerAddresses
-          .toArray(new String[0]));
-    } else if (getType().value() == Directive.Type.Response.value()) {
-      this.status.write(out);
-    } else {
-      throw new IllegalStateException("Wrong directive type:" + getType());
-    }
-
   }
 
   public void readFields(DataInput in) throws IOException {
@@ -132,32 +82,8 @@ public class Directive implements Writab
     int t = in.readInt();
     if (Directive.Type.Request.value() == t) {
       this.type = Directive.Type.Request;
-      int length = WritableUtils.readVInt(in);
-      if (length > 0) {
-        this.actions = new GroomServerAction[length];
-        for (int i = 0; i < length; ++i) {
-          GroomServerAction.ActionType actionType = WritableUtils.readEnum(in,
-              GroomServerAction.ActionType.class);
-          actions[i] = GroomServerAction.createAction(actionType);
-          actions[i].readFields(in);
-        }
-      } else {
-        this.actions = null;
-      }
-      String[] groomServerNames = WritableUtils.readCompressedStringArray(in);
-      String[] groomServerAddresses = WritableUtils
-          .readCompressedStringArray(in);
-      groomServerPeers = new HashMap<String, String>(groomServerNames.length);
-
-      for (int i = 0; i < groomServerNames.length; i++) {
-        groomServerPeers.put(groomServerNames[i], groomServerAddresses[i]);
-      }
-    } else if (Directive.Type.Response.value() == t) {
+    }else{
       this.type = Directive.Type.Response;
-      this.status = new GroomServerStatus();
-      this.status.readFields(in);
-    } else {
-      throw new IllegalStateException("Wrong directive type:" + t);
     }
   }
 }

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveException.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveException.java?rev=1097314&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveException.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveException.java Thu Apr 28 03:47:18 2011
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+public class DirectiveException extends RuntimeException{
+
+  public DirectiveException(){ 
+    super(); 
+  }
+
+  public DirectiveException(String message){ 
+    super(message); 
+  }
+
+  public DirectiveException(String message, Throwable t){
+    super(message, t);
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveHandler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveHandler.java?rev=1097314&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveHandler.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/DirectiveHandler.java Thu Apr 28 03:47:18 2011
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+public interface DirectiveHandler{
+
+  /**
+   * Handle directives on demand. 
+   * @param directive to be handled.
+   */
+  void handle(Directive directive) throws DirectiveException;
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/DispatchTasksDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/DispatchTasksDirective.java?rev=1097314&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/DispatchTasksDirective.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/DispatchTasksDirective.java Thu Apr 28 03:47:18 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+public final class DispatchTasksDirective extends Directive implements Writable {
+
+  public static final Log LOG = LogFactory.getLog(DispatchTasksDirective.class);
+
+  private Map<String, String> groomServerPeers;
+  private GroomServerAction[] actions;
+
+  public DispatchTasksDirective(){ super(); }
+  
+  public DispatchTasksDirective(Map<String, String> groomServerPeers,
+      GroomServerAction[] actions) {
+    super(Directive.Type.Request);
+    this.groomServerPeers = groomServerPeers;
+    this.actions = actions;
+  }
+
+  public Map<String, String> getGroomServerPeers() {
+    return this.groomServerPeers;
+  }
+
+  public GroomServerAction[] getActions() {
+    return this.actions;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    if (this.actions == null) {
+      WritableUtils.writeVInt(out, 0);
+    } else {
+      WritableUtils.writeVInt(out, actions.length);
+      for (GroomServerAction action : this.actions) {
+        WritableUtils.writeEnum(out, action.getActionType());
+        action.write(out);
+      }
+    }
+    String[] groomServerNames = groomServerPeers.keySet().toArray(
+        new String[0]);
+    WritableUtils.writeCompressedStringArray(out, groomServerNames);
+
+    List<String> groomServerAddresses = new ArrayList<String>(
+        groomServerNames.length);
+    for (String groomName : groomServerNames) {
+      groomServerAddresses.add(groomServerPeers.get(groomName));
+    }
+    WritableUtils.writeCompressedStringArray(out, groomServerAddresses
+        .toArray(new String[0]));
+
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    int length = WritableUtils.readVInt(in);
+    if (length > 0) {
+      this.actions = new GroomServerAction[length];
+      for (int i = 0; i < length; ++i) {
+        GroomServerAction.ActionType actionType = WritableUtils.readEnum(in,
+            GroomServerAction.ActionType.class);
+        actions[i] = GroomServerAction.createAction(actionType);
+        actions[i].readFields(in);
+      }
+    } else {
+      this.actions = null;
+    }
+    String[] groomServerNames = WritableUtils.readCompressedStringArray(in);
+    String[] groomServerAddresses = WritableUtils
+        .readCompressedStringArray(in);
+    groomServerPeers = new HashMap<String, String>(groomServerNames.length);
+
+    for (int i = 0; i < groomServerNames.length; i++) {
+      groomServerPeers.put(groomServerNames[i], groomServerAddresses[i]);
+    }
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Thu Apr 28 03:47:18 2011
@@ -32,6 +32,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -44,8 +46,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DiskChecker;
@@ -93,6 +93,7 @@ public class GroomServer implements Runn
   String groomServerName;
   String localHostname;
   InetSocketAddress bspMasterAddr;
+  private Instructor instructor;
 
   // Filesystem
   // private LocalDirAllocator localDirAllocator;
@@ -122,15 +123,89 @@ public class GroomServer implements Runn
 
   private BlockingQueue<GroomServerAction> tasksToCleanup = new LinkedBlockingQueue<GroomServerAction>();
 
+  private class DispatchTasksHandler implements DirectiveHandler {
+
+    public void handle(Directive directive) throws DirectiveException {
+      GroomServerAction[] actions = ((DispatchTasksDirective) directive)
+          .getActions();
+      synchronized (bspPeer) {
+        bspPeer.setAllPeerNames(((DispatchTasksDirective) directive)
+            .getGroomServerPeers().values());
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got Response from BSPMaster with "
+            + ((actions != null) ? actions.length : 0) + " actions");
+      }
+      if (actions != null) {
+        for (GroomServerAction action : actions) {
+          if (action instanceof LaunchTaskAction) {
+            startNewTask((LaunchTaskAction) action);
+          } else {
+
+            // TODO Use the cleanup thread
+            // tasksToCleanup.put(action);
+
+            KillTaskAction killAction = (KillTaskAction) action;
+            if (tasks.containsKey(killAction.getTaskID())) {
+              TaskInProgress tip = tasks.get(killAction.getTaskID());
+              tip.taskStatus.setRunState(TaskStatus.State.FAILED);
+              try {
+                tip.killAndCleanup(true);
+              } catch (IOException ioe) {
+                throw new DirectiveException("Error when killing a "
+                    + "TaskInProgress.", ioe);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private class Instructor extends Thread {
+    final BlockingQueue<Directive> buffer = new LinkedBlockingQueue<Directive>();
+    final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
+
+    public void bind(Class<? extends Directive> instruction,
+        DirectiveHandler handler) {
+      handlers.putIfAbsent(instruction, handler);
+    }
+
+    public void put(Directive directive) {
+      try {
+        buffer.put(directive);
+      } catch (InterruptedException ie) {
+        LOG.error("Unable to put directive into queue.", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    public void run() {
+      while (true) {
+        try {
+          Directive directive = buffer.take();
+          if (directive instanceof DispatchTasksDirective) {
+            ((DirectiveHandler) handlers.get(DispatchTasksDirective.class))
+                .handle(directive);
+          } else {
+            throw new RuntimeException("Directive is not supported."
+                + directive);
+          }
+        } catch (InterruptedException ie) {
+          LOG.error("Unable to retrieve directive from the queue.", ie);
+          Thread.currentThread().interrupt();
+        } catch (Exception e) {
+          LOG.error("Fail to execute directive.", e);
+        }
+      }
+    }
+  }
+
   public GroomServer(Configuration conf) throws IOException {
     LOG.info("groom start");
     this.conf = conf;
 
-    String mode = conf.get("bsp.master.address");
-    if (!mode.equals("local")) {
-      bspMasterAddr = BSPMaster.getAddress(conf);
-    }
-
+    bspMasterAddr = BSPMaster.getAddress(conf);
     // FileSystem local = FileSystem.getLocal(conf);
     // this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
   }
@@ -141,9 +216,8 @@ public class GroomServer implements Runn
     }
 
     if (localHostname == null) {
-      this.localHostname = DNS.getDefaultHost(
-          conf.get("bsp.dns.interface", "default"),
-          conf.get("bsp.dns.nameserver", "default"));
+      this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
+          "default"), conf.get("bsp.dns.nameserver", "default"));
     }
     // check local disk
     checkLocalDirs(conf.getStrings("bsp.local.dir"));
@@ -217,6 +291,11 @@ public class GroomServer implements Runn
       throw new IOException("There is a problem in establishing"
           + " communication link with BSPMaster.");
     }
+
+    this.instructor = new Instructor();
+    this.instructor.bind(DispatchTasksDirective.class,
+        new DispatchTasksHandler());
+    instructor.start();
     this.running = true;
     this.initialized = true;
   }
@@ -228,31 +307,10 @@ public class GroomServer implements Runn
 
   @Override
   public void dispatch(Directive directive) throws IOException {
-    // update tasks status
-    GroomServerAction[] actions = directive.getActions();
-    bspPeer.setAllPeerNames(directive.getGroomServerPeers().values());
-    LOG.debug("Got Response from BSPMaster with "
-        + ((actions != null) ? actions.length : 0) + " actions");
-    // perform actions
-    if (actions != null) {
-      for (GroomServerAction action : actions) {
-        if (action instanceof LaunchTaskAction) {
-          startNewTask((LaunchTaskAction) action);
-        } else {
-
-          // TODO Use the cleanup thread
-          // tasksToCleanup.put(action);
-
-          KillTaskAction killAction = (KillTaskAction) action;
-          if (tasks.containsKey(killAction.getTaskID())) {
-            TaskInProgress tip = tasks.get(killAction.getTaskID());
-            tip.taskStatus.setRunState(TaskStatus.State.FAILED);
-            tip.killAndCleanup(true);
-          }
+    if (!instructor.isAlive())
+      throw new IOException();
 
-        }
-      }
-    }
+    instructor.put(directive);
   }
 
   private static void checkLocalDirs(String[] localDirs)
@@ -392,15 +450,16 @@ public class GroomServer implements Runn
    * Update and report refresh status back to BSPMaster.
    */
   public void doReport(TaskStatus taskStatus) {
-    GroomServerStatus gss = new GroomServerStatus(groomServerName,
+    GroomServerStatus groomStatus = new GroomServerStatus(groomServerName,
         bspPeer.getPeerName(), updateTaskStatus(taskStatus), failures,
         maxCurrentTasks, rpcServer);
     try {
-      boolean ret = masterClient.report(new Directive(gss));
+      boolean ret = masterClient.report(new ReportGroomStatusDirective(
+          groomStatus));
       if (!ret) {
         LOG.warn("Fail to renew BSPMaster's GroomServerStatus. "
-            + " groom name: " + gss.getGroomName() + " peer name:"
-            + gss.getPeerName() + " rpc server:" + rpcServer);
+            + " groom name: " + groomStatus.getGroomName() + " peer name:"
+            + groomStatus.getPeerName() + " rpc server:" + rpcServer);
       }
     } catch (IOException ioe) {
       LOG.error("Fail to communicate with BSPMaster for reporting.", ioe);
@@ -798,8 +857,6 @@ public class GroomServer implements Runn
         throwable.printStackTrace(new PrintStream(baos));
       } finally {
         RPC.stopProxy(umbilical);
-        MetricsContext metricsContext = MetricsUtil.getContext("mapred");
-        metricsContext.close();
         // Shutting down log4j of the child-vm...
         // This assumes that on return from Task.run()
         // there is no more logging done.

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/ReportGroomStatusDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ReportGroomStatusDirective.java?rev=1097314&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ReportGroomStatusDirective.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ReportGroomStatusDirective.java Thu Apr 28 03:47:18 2011
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hama.ipc.WorkerProtocol;
+
+public class ReportGroomStatusDirective extends Directive implements Writable {
+
+  public static final Log LOG = LogFactory.getLog(ReportGroomStatusDirective.class);
+
+  private GroomServerStatus status;
+
+  public ReportGroomStatusDirective(){ super(); }
+  
+  public ReportGroomStatusDirective(GroomServerStatus status) {
+    super(Directive.Type.Response);
+    this.status = status;
+  }
+
+  public GroomServerStatus getStatus() {
+    return this.status;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    this.status.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.status = new GroomServerStatus();
+    this.status.readFields(in);
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Thu Apr 28 03:47:18 2011
@@ -136,9 +136,9 @@ class SimpleTaskScheduler extends TaskSc
         WorkerProtocol worker = groomServerManager.findGroomServer(this.stus);
         try {
           // dispatch() to the groom server
-          Directive d1 = new Directive(groomServerManager
-              .currentGroomServerPeers(),
-              new GroomServerAction[] { new LaunchTaskAction(t) });
+          Directive d1 = new DispatchTasksDirective(groomServerManager
+              .currentGroomServerPeers(), new GroomServerAction[] { 
+              new LaunchTaskAction(t)});
           worker.dispatch(d1);
         } catch (IOException ioe) {
           LOG.error("Fail to dispatch tasks to GroomServer "

Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java Thu Apr 28 03:47:18 2011
@@ -22,8 +22,7 @@ import java.io.IOException;
 import org.apache.hama.bsp.Directive;
 
 /**
- * A protocol for BSPMaster talks to GroomServer. This protocol 
- * allow BSPMaster dispatch tasks to a GroomServer.
+ * A protocol for BSPMaster talks to GroomServer. 
  */
 public interface WorkerProtocol extends HamaRPCProtocolVersion {
 

Modified: incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java Thu Apr 28 03:47:18 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 public abstract class HamaClusterTestCase extends HamaTestCase {
   public static final Log LOG = LogFactory.getLog(HamaClusterTestCase.class);
   protected MiniDFSCluster dfsCluster;
+  protected MiniBSPCluster bspCluster;
   protected MiniZooKeeperCluster zooKeeperCluster;
   protected boolean startDfs;
 
@@ -52,6 +53,8 @@ public abstract class HamaClusterTestCas
     this.zooKeeperCluster = new MiniZooKeeperCluster();
     int clientPort = this.zooKeeperCluster.startup(testDir);
     conf.set("hama.zookeeper.property.clientPort", Integer.toString(clientPort));
+    bspCluster = new MiniBSPCluster(this.conf, 2); 
+    bspCluster.startBSPCluster();
   }
 
   @Override
@@ -95,6 +98,7 @@ public abstract class HamaClusterTestCas
       if (startDfs) {
         shutdownDfs(dfsCluster);
       }
+      bspCluster.shutdown();
     } catch (Exception e) {
       LOG.error(e);
     }

Modified: incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaTestCase.java Thu Apr 28 03:47:18 2011
@@ -62,6 +62,10 @@ public abstract class HamaTestCase exten
   
   private void init() {
     conf = new HamaConfiguration();
+    conf.setStrings("bsp.local.dir", "/tmp/hama-test");
+    conf.set("bsp.master.address", "localhost");
+    conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    conf.set("bsp.groom.report.address", "127.0.0.1:0");
   }
 
   /**

Modified: incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java?rev=1097314&r1=1097313&r2=1097314&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java Thu Apr 28 03:47:18 2011
@@ -135,11 +135,24 @@ public class MiniBSPCluster {
     int threadpool = conf.getInt("bsp.test.threadpool", 10);
     LOG.info("Thread pool value "+threadpool);
     scheduler = Executors.newScheduledThreadPool(threadpool);
+  }
 
+  public void startBSPCluster(){
     startMaster();
     startGroomServers();
   }
 
+  public void shutdownBSPCluster(){
+    if(null != this.master && this.master.isRunning())
+      this.master.shutdown();
+    if(0 < groomServerList.size()){
+      for(GroomServerRunner groom: groomServerList){
+        if(groom.isRunning()) groom.shutdown();
+      }
+    }
+  }
+
+
   public void startMaster(){
     if(null == this.scheduler) 
       throw new NullPointerException("No ScheduledExecutorService exists.");
@@ -203,6 +216,7 @@ public class MiniBSPCluster {
   }
 
   public void shutdown() {
+    shutdownBSPCluster();
     scheduler.shutdown();
   }
 

Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1097314&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMasterGroomServer.java Thu Apr 28 03:47:18 2011
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+
+public class TestBSPMasterGroomServer extends HamaCluster {
+
+  private static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class);
+  private static String TMP_OUTPUT = "/tmp/test-example/";
+  private HamaConfiguration configuration;
+  private String TEST_JOB = "src/test/testjar/testjob.jar";
+
+  public TestBSPMasterGroomServer() {
+    configuration = new HamaConfiguration();
+    configuration.set("bsp.master.address", "localhost");
+    assertEquals("Make sure master addr is set to localhost:", "localhost",
+        configuration.get("bsp.master.address"));
+    configuration.setStrings("bsp.local.dir", "/tmp/hama-test");
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  public void testSubmitJob() throws Exception {
+    BSPJob bsp = new BSPJob(configuration);
+    bsp.setJobName("Test Serialize Printing");
+    bsp.setBspClass(testjar.ClassSerializePrinting.HelloBSP.class);
+    bsp.setJar(TEST_JOB);
+
+    // Set the task size as a number of GroomServer
+    BSPJobClient jobClient = new BSPJobClient(configuration);
+    ClusterStatus cluster = jobClient.getClusterStatus(false);
+    bsp.setNumBspTask(cluster.getGroomServers());
+    FileSystem fileSys = FileSystem.get(conf);
+
+    if (bsp.waitForCompletion(true)) {
+      checkOutput(fileSys, cluster, conf);
+    }
+    LOG.info("Client finishes execution job.");
+  }
+
+  private static void checkOutput(FileSystem fileSys, ClusterStatus cluster,
+      HamaConfiguration conf) throws Exception {
+    for (int i = 0; i < cluster.getGroomServers(); i++) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
+          TMP_OUTPUT + i), conf);
+      LongWritable timestamp = new LongWritable();
+      Text message = new Text();
+      reader.next(timestamp, message);
+      assertTrue("Check if `Hello BSP' gets printed.", message.toString()
+          .indexOf("Hello BSP from") >= 0);
+      reader.close();
+    }
+  }
+
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+}

Added: incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java?rev=1097314&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java (added)
+++ incubator/hama/trunk/src/test/testjar/ClassSerializePrinting.java Thu Apr 28 03:47:18 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package testjar;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.zookeeper.KeeperException;
+
+public class ClassSerializePrinting {
+  private static String TMP_OUTPUT = "/tmp/test-example/";
+
+  public static class HelloBSP extends BSP {
+    public static final Log LOG = LogFactory.getLog(HelloBSP.class);
+    private Configuration conf;
+    private final static int PRINT_INTERVAL = 1000;
+    private FileSystem fileSys;
+    private int num;
+
+    public void bsp(BSPPeerProtocol bspPeer) throws IOException,
+        KeeperException, InterruptedException {
+
+      int i = 0;
+      for (String otherPeer : bspPeer.getAllPeerNames()) {
+        String peerName = bspPeer.getPeerName();
+        if (peerName.equals(otherPeer)) {
+          writeLogToFile(peerName, i);
+        }
+
+        Thread.sleep(PRINT_INTERVAL);
+        bspPeer.sync();
+        i++;
+      }
+    }
+
+    private void writeLogToFile(String string, int i) throws IOException {
+      SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+          new Path(TMP_OUTPUT + i), LongWritable.class, Text.class,
+          CompressionType.NONE);
+      writer.append(new LongWritable(System.currentTimeMillis()), new Text(
+          "Hello BSP from " + (i + 1) + " of " + num + ": " + string));
+      writer.close();
+    }
+
+    public Configuration getConf() {
+      return conf;
+    }
+
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      num = Integer.parseInt(conf.get("bsp.peers.num"));
+      try {
+        fileSys = FileSystem.get(conf);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+  }
+
+}

Added: incubator/hama/trunk/src/test/testjar/testjob.jar
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/testjar/testjob.jar?rev=1097314&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/hama/trunk/src/test/testjar/testjob.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream